mirror of
https://github.com/jlengrand/ktor.git
synced 2026-03-10 08:31:20 +00:00
Make client websocket feature common
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,5 +1,6 @@
|
||||
build
|
||||
.gradle
|
||||
.gradletasknamecache
|
||||
.idea/*
|
||||
!.idea/runConfigurations
|
||||
!.idea/runConfigurations/*
|
||||
|
||||
@@ -42,7 +42,10 @@ def projectNeedsPlatform(project, platform) {
|
||||
def hasDarwin = files.any { it.name == "darwin" }
|
||||
|
||||
if (hasPosix && hasDarwin) return false
|
||||
if (!hasDarwin && platform == "darwin") return false
|
||||
|
||||
if (hasPosix && platform == "darwin") return false
|
||||
if (hasDarwin && platform == "posix") return false
|
||||
if (!hasPosix && !hasDarwin && platform == "darwin") return false
|
||||
|
||||
return files.any { it.name == "common" || it.name == platform }
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ kotlin.code.style=official
|
||||
|
||||
# config
|
||||
version=1.2.0-SNAPSHOT
|
||||
kotlin.incremental.js=true
|
||||
kotlin.incremental.js=false
|
||||
kotlin.incremental.multiplatform=true
|
||||
|
||||
# gradle
|
||||
|
||||
@@ -90,6 +90,9 @@ prepareMocha.doLast {
|
||||
<script src="$libraryPath/ktor-client.js"></script>
|
||||
<script src="$libraryPath/ktor-client-json.js"></script>
|
||||
<script src="$libraryPath/ktor-client-auth-basic.js"></script>
|
||||
<script src="$libraryPath/ktor-client-tests-dispatcher.js"></script>
|
||||
<script src="$libraryPath/ktor-client-tests.js"></script>
|
||||
<script src="$libraryPath/ktor-client-websocket.js"></script>
|
||||
<script src="$compileTestKotlinJs.outputFile"></script>
|
||||
<script>mocha.run();</script>
|
||||
</body>
|
||||
|
||||
@@ -33,9 +33,18 @@ kotlin {
|
||||
configure([iosArm32Main, iosArm64Main, iosX64Main, macosX64Main, linuxX64Main, mingwX64Main]) {
|
||||
dependsOn posixMain
|
||||
}
|
||||
|
||||
configure([iosArm32Test, iosArm64Test, iosX64Test, macosX64Test, linuxX64Test, mingwX64Test]) {
|
||||
dependsOn posixTest
|
||||
}
|
||||
|
||||
iosArm32Test.dependsOn iosArm32Main
|
||||
iosArm64Test.dependsOn iosArm64Main
|
||||
iosX64Test.dependsOn iosX64Main
|
||||
linuxX64Test.dependsOn linuxX64Main
|
||||
macosX64Test.dependsOn macosX64Main
|
||||
iosX64Test.dependsOn iosX64Main
|
||||
mingwX64Test.dependsOn mingwX64Main
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
|
||||
kotlin.sourceSets.commonMain.dependencies {
|
||||
api project(':ktor-client:ktor-client-core')
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ kotlin.sourceSets {
|
||||
api project(':ktor-client:ktor-client-core')
|
||||
api project(':ktor-http:ktor-http-cio')
|
||||
api project(':ktor-network:ktor-network-tls')
|
||||
api project(':ktor-client:ktor-client-features:ktor-client-websocket')
|
||||
}
|
||||
jvmTest.dependencies {
|
||||
api project(':ktor-client:ktor-client-tests')
|
||||
|
||||
@@ -2,7 +2,9 @@ package io.ktor.client.engine.cio
|
||||
|
||||
import io.ktor.client.call.*
|
||||
import io.ktor.client.engine.*
|
||||
import io.ktor.client.features.websocket.*
|
||||
import io.ktor.client.request.*
|
||||
import io.ktor.client.response.*
|
||||
import io.ktor.http.*
|
||||
import io.ktor.network.selector.*
|
||||
import kotlinx.coroutines.*
|
||||
@@ -11,7 +13,7 @@ import java.io.*
|
||||
import java.util.concurrent.*
|
||||
import java.util.concurrent.atomic.*
|
||||
|
||||
internal class CIOEngine(override val config: CIOEngineConfig) : HttpClientJvmEngine("ktor-cio") {
|
||||
internal class CIOEngine(override val config: CIOEngineConfig) : HttpClientJvmEngine("ktor-cio"), WebSocketEngine {
|
||||
private val endpoints = ConcurrentHashMap<String, Endpoint>()
|
||||
|
||||
@UseExperimental(InternalCoroutinesApi::class)
|
||||
@@ -29,7 +31,12 @@ internal class CIOEngine(override val config: CIOEngineConfig) : HttpClientJvmEn
|
||||
return@withContext HttpEngineCall(request, response)
|
||||
}
|
||||
|
||||
private suspend fun executeRequest(request: DefaultHttpRequest): CIOHttpResponse {
|
||||
override suspend fun execute(request: HttpRequest): WebSocketResponse {
|
||||
val response = executeRequest(request)
|
||||
return response as WebSocketResponse
|
||||
}
|
||||
|
||||
private suspend fun executeRequest(request: HttpRequest): HttpResponse {
|
||||
while (true) {
|
||||
if (closed.get()) throw ClientClosedException()
|
||||
|
||||
|
||||
@@ -11,9 +11,10 @@ import kotlin.coroutines.*
|
||||
|
||||
internal class CIOHttpResponse(
|
||||
request: HttpRequest,
|
||||
override val headers: Headers,
|
||||
override val requestTime: GMTDate,
|
||||
override val content: ByteReadChannel,
|
||||
private val response: Response,
|
||||
response: Response,
|
||||
override val coroutineContext: CoroutineContext
|
||||
) : HttpResponse {
|
||||
|
||||
@@ -21,14 +22,7 @@ internal class CIOHttpResponse(
|
||||
|
||||
override val status: HttpStatusCode = HttpStatusCode(response.status, response.statusText.toString())
|
||||
|
||||
override val version: HttpProtocolVersion = HttpProtocolVersion.HTTP_1_1
|
||||
|
||||
override val headers: Headers = Headers.build {
|
||||
val origin = CIOHeaders(response.headers)
|
||||
origin.names().forEach {
|
||||
appendAll(it, origin.getAll(it))
|
||||
}
|
||||
}
|
||||
override val version: HttpProtocolVersion = HttpProtocolVersion.parse(response.version)
|
||||
|
||||
override val responseTime: GMTDate = GMTDate()
|
||||
|
||||
|
||||
@@ -6,17 +6,20 @@ import io.ktor.network.sockets.*
|
||||
import io.ktor.network.sockets.Socket
|
||||
import java.net.*
|
||||
|
||||
internal class ConnectionFactory(private val selector: SelectorManager, maxConnectionsCount: Int) {
|
||||
internal class ConnectionFactory(
|
||||
private val selector: SelectorManager,
|
||||
maxConnectionsCount: Int
|
||||
) {
|
||||
private val semaphore = Semaphore(maxConnectionsCount)
|
||||
|
||||
suspend fun connect(address: InetSocketAddress): Socket {
|
||||
semaphore.enter()
|
||||
return try {
|
||||
aSocket(selector).tcpNoDelay().tcp().connect(address)
|
||||
} catch (t: Throwable) {
|
||||
} catch (cause: Throwable) {
|
||||
// a failure or cancellation
|
||||
semaphore.leave()
|
||||
throw t
|
||||
throw cause
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -76,6 +76,11 @@ internal class ConnectionPipeline(
|
||||
val transferEncoding = rawResponse.headers[HttpHeaders.TransferEncoding]
|
||||
val chunked = transferEncoding == "chunked"
|
||||
val connectionType = ConnectionOptions.parse(rawResponse.headers[HttpHeaders.Connection])
|
||||
val headers = CIOHeaders(rawResponse.headers)
|
||||
|
||||
callContext[Job]?.invokeOnCompletion {
|
||||
rawResponse.release()
|
||||
}
|
||||
|
||||
shouldClose = (connectionType == ConnectionOptions.Close)
|
||||
|
||||
@@ -90,7 +95,7 @@ internal class ConnectionPipeline(
|
||||
} else ByteReadChannel.Empty
|
||||
|
||||
val response = CIOHttpResponse(
|
||||
task.request, requestTime,
|
||||
task.request, headers, requestTime,
|
||||
body,
|
||||
rawResponse,
|
||||
coroutineContext = callContext
|
||||
@@ -99,19 +104,13 @@ internal class ConnectionPipeline(
|
||||
task.response.complete(response)
|
||||
|
||||
responseChannel?.use {
|
||||
try {
|
||||
parseHttpBody(
|
||||
contentLength,
|
||||
transferEncoding,
|
||||
connectionType,
|
||||
networkInput,
|
||||
this
|
||||
)
|
||||
} finally {
|
||||
callContext[Job]?.invokeOnCompletion {
|
||||
rawResponse.release()
|
||||
}
|
||||
}
|
||||
parseHttpBody(
|
||||
contentLength,
|
||||
transferEncoding,
|
||||
connectionType,
|
||||
networkInput,
|
||||
this
|
||||
)
|
||||
}
|
||||
|
||||
skipTask?.join()
|
||||
|
||||
@@ -1,19 +1,22 @@
|
||||
package io.ktor.client.engine.cio
|
||||
|
||||
import io.ktor.client.features.websocket.*
|
||||
import io.ktor.client.request.*
|
||||
import io.ktor.client.response.*
|
||||
import io.ktor.http.*
|
||||
import io.ktor.http.cio.*
|
||||
import io.ktor.http.cio.websocket.*
|
||||
import io.ktor.network.sockets.*
|
||||
import io.ktor.network.sockets.Socket
|
||||
import io.ktor.network.tls.*
|
||||
import io.ktor.util.*
|
||||
import io.ktor.util.date.*
|
||||
import kotlinx.atomicfu.*
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.channels.*
|
||||
import kotlinx.coroutines.io.*
|
||||
import java.io.*
|
||||
import java.net.*
|
||||
import java.util.concurrent.atomic.*
|
||||
import kotlin.coroutines.*
|
||||
|
||||
internal class Endpoint(
|
||||
@@ -25,14 +28,13 @@ internal class Endpoint(
|
||||
override val coroutineContext: CoroutineContext,
|
||||
private val onDone: () -> Unit
|
||||
) : CoroutineScope, Closeable {
|
||||
private val address = InetSocketAddress(host, port)
|
||||
|
||||
private val connections: AtomicInt = atomic(0)
|
||||
private val tasks: Channel<RequestTask> = Channel(Channel.UNLIMITED)
|
||||
private val deliveryPoint: Channel<RequestTask> = Channel()
|
||||
private val maxEndpointIdleTime = 2 * config.endpoint.connectTimeout
|
||||
|
||||
@Volatile
|
||||
private var connectionsHolder: Int = 0
|
||||
|
||||
private val address = InetSocketAddress(host, port)
|
||||
private val maxEndpointIdleTime: Long = 2 * config.endpoint.connectTimeout
|
||||
|
||||
private val postman = launch(start = CoroutineStart.LAZY) {
|
||||
try {
|
||||
@@ -64,8 +66,8 @@ internal class Endpoint(
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun execute(request: DefaultHttpRequest, callContext: CoroutineContext): CIOHttpResponse {
|
||||
val result = CompletableDeferred<CIOHttpResponse>(parent = callContext[Job])
|
||||
suspend fun execute(request: HttpRequest, callContext: CoroutineContext): HttpResponse {
|
||||
val result = CompletableDeferred<HttpResponse>(parent = callContext[Job])
|
||||
val task = RequestTask(request, result, callContext)
|
||||
tasks.offer(task)
|
||||
return result.await()
|
||||
@@ -74,7 +76,7 @@ internal class Endpoint(
|
||||
private suspend fun makePipelineRequest(task: RequestTask) {
|
||||
if (deliveryPoint.offer(task)) return
|
||||
|
||||
val connections = Connections.get(this@Endpoint)
|
||||
val connections = connections.value
|
||||
if (connections < config.endpoint.maxConnectionsPerRoute) {
|
||||
try {
|
||||
createPipeline()
|
||||
@@ -114,18 +116,20 @@ internal class Endpoint(
|
||||
val contentLength = rawResponse.headers[HttpHeaders.ContentLength]?.toString()?.toLong() ?: -1L
|
||||
val transferEncoding = rawResponse.headers[HttpHeaders.TransferEncoding]
|
||||
val connectionType = ConnectionOptions.parse(rawResponse.headers[HttpHeaders.Connection])
|
||||
val headers = CIOHeaders(rawResponse.headers)
|
||||
|
||||
callContext[Job]!!.invokeOnCompletion {
|
||||
rawResponse.headers.release()
|
||||
}
|
||||
|
||||
if (status == HttpStatusCode.SwitchingProtocols.value) {
|
||||
val session = RawWebSocket(input, output, masking = true, coroutineContext = callContext)
|
||||
response.complete(WebSocketResponse(callContext, requestTime, session))
|
||||
return@launch
|
||||
}
|
||||
|
||||
|
||||
val body = when {
|
||||
status == HttpStatusCode.SwitchingProtocols.value -> {
|
||||
val content = request.content as? ClientUpgradeContent
|
||||
?: error("Invalid content type: UpgradeContent required")
|
||||
|
||||
launch {
|
||||
content.pipeTo(output)
|
||||
}.invokeOnCompletion(::closeConnection)
|
||||
|
||||
input
|
||||
}
|
||||
request.method == HttpMethod.Head -> {
|
||||
closeConnection()
|
||||
ByteReadChannel.Empty
|
||||
@@ -140,12 +144,12 @@ internal class Endpoint(
|
||||
}
|
||||
}
|
||||
|
||||
response.complete(
|
||||
CIOHttpResponse(
|
||||
request, requestTime, body, rawResponse,
|
||||
coroutineContext = callContext
|
||||
)
|
||||
val result = CIOHttpResponse(
|
||||
request, headers, requestTime, body, rawResponse,
|
||||
coroutineContext = callContext
|
||||
)
|
||||
|
||||
response.complete(result)
|
||||
} catch (cause: Throwable) {
|
||||
response.completeExceptionally(cause)
|
||||
}
|
||||
@@ -168,7 +172,7 @@ internal class Endpoint(
|
||||
val retryAttempts = config.endpoint.connectRetryAttempts
|
||||
val connectTimeout = config.endpoint.connectTimeout
|
||||
|
||||
Connections.incrementAndGet(this)
|
||||
connections.incrementAndGet()
|
||||
|
||||
try {
|
||||
repeat(retryAttempts) {
|
||||
@@ -187,23 +191,28 @@ internal class Endpoint(
|
||||
address.hostName
|
||||
)
|
||||
}
|
||||
} catch (t: Throwable) {
|
||||
} catch (cause: Throwable) {
|
||||
try {
|
||||
connection.close()
|
||||
} catch (_: Throwable) {
|
||||
}
|
||||
|
||||
connectionFactory.release()
|
||||
throw t
|
||||
throw cause
|
||||
}
|
||||
}
|
||||
} catch (cause: Throwable) {
|
||||
Connections.decrementAndGet(this)
|
||||
connections.decrementAndGet()
|
||||
throw cause
|
||||
}
|
||||
|
||||
Connections.decrementAndGet(this)
|
||||
connections.decrementAndGet()
|
||||
throw ConnectException()
|
||||
}
|
||||
|
||||
private fun releaseConnection() {
|
||||
connectionFactory.release()
|
||||
Connections.decrementAndGet(this)
|
||||
connections.decrementAndGet()
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
@@ -213,12 +222,8 @@ internal class Endpoint(
|
||||
init {
|
||||
postman.start()
|
||||
}
|
||||
|
||||
companion object {
|
||||
private val Connections = AtomicIntegerFieldUpdater
|
||||
.newUpdater(Endpoint::class.java, Endpoint::connectionsHolder.name)
|
||||
}
|
||||
}
|
||||
|
||||
@KtorExperimentalAPI
|
||||
@Suppress("KDocMissingDocumentation")
|
||||
class ConnectException : Exception("Connect timed out or retry attempts exceeded")
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
package io.ktor.client.engine.cio
|
||||
|
||||
import io.ktor.client.request.*
|
||||
import io.ktor.client.response.*
|
||||
import io.ktor.http.*
|
||||
import io.ktor.util.date.*
|
||||
import kotlinx.coroutines.*
|
||||
import kotlin.coroutines.*
|
||||
|
||||
internal data class RequestTask(
|
||||
val request: DefaultHttpRequest,
|
||||
val response: CompletableDeferred<CIOHttpResponse>,
|
||||
val request: HttpRequest,
|
||||
val response: CompletableDeferred<HttpResponse>,
|
||||
val context: CoroutineContext
|
||||
)
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ import io.ktor.http.content.*
|
||||
import kotlinx.coroutines.io.*
|
||||
import kotlin.coroutines.*
|
||||
|
||||
internal suspend fun DefaultHttpRequest.write(output: ByteWriteChannel, callContext: CoroutineContext) {
|
||||
internal suspend fun HttpRequest.write(output: ByteWriteChannel, callContext: CoroutineContext) {
|
||||
val builder = RequestResponseBuilder()
|
||||
|
||||
val contentLength = headers[HttpHeaders.ContentLength] ?: content.contentLength?.toString()
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
package io.ktor.client.features.websocket
|
||||
|
||||
import io.ktor.client.*
|
||||
import io.ktor.client.request.*
|
||||
import io.ktor.http.*
|
||||
|
||||
suspend fun HttpClient.webSocketRawSession(
|
||||
method: HttpMethod = HttpMethod.Get, host: String = "localhost", port: Int = DEFAULT_PORT, path: String = "/",
|
||||
block: HttpRequestBuilder.() -> Unit = {}
|
||||
): ClientWebSocketSession = request {
|
||||
this.method = method
|
||||
url("ws", host, port, path)
|
||||
block()
|
||||
}
|
||||
|
||||
suspend fun HttpClient.webSocketRaw(
|
||||
method: HttpMethod = HttpMethod.Get, host: String = "localhost", port: Int = DEFAULT_PORT, path: String = "/",
|
||||
request: HttpRequestBuilder.() -> Unit = {}, block: suspend ClientWebSocketSession.() -> Unit
|
||||
): Unit {
|
||||
val session = webSocketRawSession(method, host, port, path) {
|
||||
url.protocol = URLProtocol.WS
|
||||
url.port = port
|
||||
|
||||
request()
|
||||
}
|
||||
|
||||
try {
|
||||
session.block()
|
||||
} catch (cause: Throwable) {
|
||||
session.close(cause)
|
||||
} finally {
|
||||
session.close()
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun HttpClient.wsRaw(
|
||||
method: HttpMethod = HttpMethod.Get, host: String = "localhost", port: Int = DEFAULT_PORT, path: String = "/",
|
||||
request: HttpRequestBuilder.() -> Unit = {}, block: suspend ClientWebSocketSession.() -> Unit
|
||||
): Unit = webSocketRaw(method, host, port, path, request, block)
|
||||
|
||||
suspend fun HttpClient.wssRaw(
|
||||
method: HttpMethod = HttpMethod.Get, host: String = "localhost", port: Int = DEFAULT_PORT, path: String = "/",
|
||||
request: HttpRequestBuilder.() -> Unit = {}, block: suspend ClientWebSocketSession.() -> Unit
|
||||
): Unit = webSocketRaw(method, host, port, path, request = {
|
||||
url.protocol = URLProtocol.WSS
|
||||
url.port = port
|
||||
|
||||
request()
|
||||
}, block = block)
|
||||
@@ -52,14 +52,16 @@ fun HttpClient(
|
||||
* Asynchronous client to perform HTTP requests.
|
||||
*
|
||||
* This is a generic implementation that uses a specific engine [HttpClientEngine].
|
||||
* @property engine: [HttpClientEngine] for executing requests.
|
||||
*/
|
||||
class HttpClient(
|
||||
private val engine: HttpClientEngine,
|
||||
@InternalAPI val engine: HttpClientEngine,
|
||||
private val userConfig: HttpClientConfig<out HttpClientEngineConfig> = HttpClientConfig()
|
||||
) : CoroutineScope, Closeable {
|
||||
private val closed = atomic(false)
|
||||
|
||||
override val coroutineContext: CoroutineContext get() = engine.coroutineContext
|
||||
|
||||
/**
|
||||
* Pipeline used for processing all the requests sent by this client.
|
||||
*/
|
||||
|
||||
@@ -4,6 +4,7 @@ import io.ktor.client.*
|
||||
import io.ktor.client.features.*
|
||||
import io.ktor.client.request.*
|
||||
import io.ktor.client.response.*
|
||||
import io.ktor.util.*
|
||||
import kotlinx.atomicfu.*
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.io.core.*
|
||||
@@ -13,9 +14,9 @@ import kotlin.reflect.*
|
||||
/**
|
||||
* A class that represents a single pair of [request] and [response] for a specific [HttpClient].
|
||||
*
|
||||
* [client] - client that executed the call.
|
||||
* @property client: client that executed the call.
|
||||
*/
|
||||
class HttpClientCall internal constructor(
|
||||
open class HttpClientCall constructor(
|
||||
val client: HttpClient
|
||||
) : CoroutineScope, Closeable {
|
||||
private val received = atomic(false)
|
||||
@@ -23,7 +24,12 @@ class HttpClientCall internal constructor(
|
||||
override val coroutineContext: CoroutineContext get() = response.coroutineContext
|
||||
|
||||
/**
|
||||
* Represents the [request] sent by the client.
|
||||
* Typed [Attributes] associated to this call serving as a lightweight container.
|
||||
*/
|
||||
val attributes: Attributes get() = request.attributes
|
||||
|
||||
/**
|
||||
* Represents the [request] sent by the client
|
||||
*/
|
||||
lateinit var request: HttpRequest
|
||||
internal set
|
||||
@@ -70,6 +76,12 @@ class HttpClientCall internal constructor(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Raw http call produced by engine.
|
||||
*
|
||||
* @property request - executed http request.
|
||||
* @property response - raw http response
|
||||
*/
|
||||
data class HttpEngineCall(val request: HttpRequest, val response: HttpResponse)
|
||||
|
||||
/**
|
||||
@@ -98,6 +110,7 @@ suspend inline fun <reified T> HttpResponse.receive(): T = call.receive(typeInfo
|
||||
/**
|
||||
* Exception representing that the response payload has already been received.
|
||||
*/
|
||||
@Suppress("KDocMissingDocumentation")
|
||||
class DoubleReceiveException(call: HttpClientCall) : IllegalStateException() {
|
||||
override val message: String = "Response already received: $call"
|
||||
}
|
||||
@@ -106,6 +119,7 @@ class DoubleReceiveException(call: HttpClientCall) : IllegalStateException() {
|
||||
* Exception representing fail of the response pipeline
|
||||
* [cause] contains origin pipeline exception
|
||||
*/
|
||||
@Suppress("KDocMissingDocumentation")
|
||||
class ReceivePipelineException(
|
||||
val request: HttpClientCall,
|
||||
val info: TypeInfo,
|
||||
@@ -116,6 +130,7 @@ class ReceivePipelineException(
|
||||
* Exception representing the no transformation was found.
|
||||
* It includes the received type and the expected type as part of the message.
|
||||
*/
|
||||
@Suppress("KDocMissingDocumentation")
|
||||
class NoTransformationFoundException(from: KClass<*>, to: KClass<*>) : UnsupportedOperationException() {
|
||||
override val message: String? = "No transformation found: $from -> $to"
|
||||
}
|
||||
@@ -125,4 +140,5 @@ class NoTransformationFoundException(from: KClass<*>, to: KClass<*>) : Unsupport
|
||||
ReplaceWith("NoTransformationFoundException"),
|
||||
DeprecationLevel.ERROR
|
||||
)
|
||||
@Suppress("KDocMissingDocumentation")
|
||||
typealias NoTransformationFound = NoTransformationFoundException
|
||||
|
||||
@@ -9,6 +9,10 @@ import io.ktor.http.content.*
|
||||
class UnsupportedContentTypeException(content: OutgoingContent) :
|
||||
IllegalStateException("Failed to write body: ${content::class}")
|
||||
|
||||
class UnsupportedUpgradeProtocolException(
|
||||
url: Url
|
||||
) : IllegalArgumentException("Unsupported upgrade protocol exception: $url")
|
||||
|
||||
/**
|
||||
* Constructs a [HttpClientCall] from this [HttpClient] and
|
||||
* with the specified HTTP request [builder].
|
||||
|
||||
@@ -61,7 +61,7 @@ class HttpSend(
|
||||
do {
|
||||
callChanged = false
|
||||
|
||||
passInterceptors@for (interceptor in feature.interceptors) {
|
||||
passInterceptors@ for (interceptor in feature.interceptors) {
|
||||
val transformed = interceptor(sender, currentCall)
|
||||
if (transformed === currentCall) continue@passInterceptors
|
||||
|
||||
@@ -77,7 +77,7 @@ class HttpSend(
|
||||
}
|
||||
|
||||
private class DefaultSender(private val maxSendCount: Int, private val client: HttpClient) : Sender {
|
||||
private var sentCount = 0
|
||||
private var sentCount: Int = 0
|
||||
|
||||
override suspend fun execute(requestBuilder: HttpRequestBuilder): HttpClientCall {
|
||||
if (sentCount >= maxSendCount) throw SendCountExceedException("Max send count $maxSendCount exceeded")
|
||||
|
||||
@@ -5,7 +5,7 @@ import io.ktor.http.*
|
||||
import kotlinx.coroutines.io.*
|
||||
|
||||
abstract class ClientUpgradeContent : OutgoingContent.NoContent() {
|
||||
private val content: ByteChannel = ByteChannel()
|
||||
private val content: ByteChannel by lazy { ByteChannel() }
|
||||
|
||||
val output: ByteWriteChannel get() = content
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ interface HttpRequest : HttpMessage, CoroutineScope {
|
||||
val url: Url
|
||||
|
||||
/**
|
||||
* Typed [Attributes] associated to this request serving as a lightweight container.
|
||||
* Typed [Attributes] associated to this call serving as a lightweight container.
|
||||
*/
|
||||
val attributes: Attributes
|
||||
|
||||
@@ -44,13 +44,13 @@ interface HttpRequest : HttpMessage, CoroutineScope {
|
||||
level = DeprecationLevel.ERROR,
|
||||
replaceWith = ReplaceWith("coroutineContext")
|
||||
)
|
||||
val executionContext: Job get() = TODO()
|
||||
val executionContext: Job
|
||||
get() = TODO()
|
||||
|
||||
/**
|
||||
* An [OutgoingContent] representing the request body
|
||||
*/
|
||||
val content: OutgoingContent
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -8,7 +8,6 @@ import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.io.*
|
||||
import kotlinx.io.charsets.*
|
||||
import kotlinx.io.core.*
|
||||
import kotlinx.io.core.Closeable
|
||||
|
||||
/**
|
||||
* A response for [HttpClient], second part of [HttpClientCall].
|
||||
@@ -49,13 +48,15 @@ interface HttpResponse : HttpMessage, CoroutineScope, Closeable {
|
||||
replaceWith = ReplaceWith("coroutineContext"),
|
||||
level = DeprecationLevel.ERROR
|
||||
)
|
||||
val executionContext: Job get() = coroutineContext[Job]!!
|
||||
val executionContext: Job
|
||||
get() = coroutineContext[Job]!!
|
||||
|
||||
/**
|
||||
* [ByteReadChannel] with the payload of the response.
|
||||
*/
|
||||
val content: ByteReadChannel
|
||||
|
||||
@Suppress("KDocMissingDocumentation")
|
||||
override fun close() {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
(coroutineContext[Job] as CompletableDeferred<Unit>).complete(Unit)
|
||||
|
||||
@@ -11,7 +11,6 @@ import io.ktor.client.engine.*
|
||||
@HttpClientDsl
|
||||
actual fun HttpClient(
|
||||
block: HttpClientConfig<*>.() -> Unit
|
||||
): HttpClient = engines.firstOrNull()?.let { HttpClient(it, block) }
|
||||
?: error(
|
||||
"Failed to find HttpClientEngineContainer. Consider adding [HttpClientEngine] implementation in dependencies."
|
||||
)
|
||||
): HttpClient = engines.firstOrNull()?.let { HttpClient(it, block) } ?: error(
|
||||
"Failed to find HttpClientEngineContainer. Consider adding [HttpClientEngine] implementation in dependencies."
|
||||
)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.ktor.client.engine
|
||||
|
||||
import kotlin.native.concurrent.*
|
||||
import io.ktor.util.*
|
||||
|
||||
@InternalAPI
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.ktor.client.engine.curl
|
||||
|
||||
import kotlin.native.concurrent.*
|
||||
import io.ktor.client.engine.*
|
||||
import libcurl.*
|
||||
|
||||
@@ -14,6 +15,10 @@ private val curlGlobalInitReturnCode = curl_global_init(CURL_GLOBAL_ALL)
|
||||
@ThreadLocal
|
||||
private val initHook = Curl
|
||||
|
||||
/**
|
||||
* [HttpClientEngineFactory] using a curl library in implementation
|
||||
* with the the associated configuration [HttpClientEngineConfig].
|
||||
*/
|
||||
object Curl : HttpClientEngineFactory<HttpClientEngineConfig> {
|
||||
|
||||
init {
|
||||
|
||||
@@ -1,12 +1,16 @@
|
||||
description = "Ktor websocket support"
|
||||
|
||||
kotlin.sourceSets {
|
||||
jvmMain.dependencies {
|
||||
commonMain.dependencies {
|
||||
api project(':ktor-client:ktor-client-core')
|
||||
api project(':ktor-http:ktor-http-cio')
|
||||
}
|
||||
commonTest.dependencies {
|
||||
api project(':ktor-client:ktor-client-tests')
|
||||
}
|
||||
jvmTest.dependencies {
|
||||
api project(':ktor-client:ktor-client-cio')
|
||||
api project(':ktor-client:ktor-client-okhttp')
|
||||
api project(':ktor-features:ktor-websockets')
|
||||
api project(':ktor-client:ktor-client-tests')
|
||||
}
|
||||
|
||||
@@ -19,8 +19,8 @@ interface ClientWebSocketSession : WebSocketSession {
|
||||
class DefaultClientWebSocketSession(
|
||||
override val call: HttpClientCall,
|
||||
delegate: DefaultWebSocketSession
|
||||
) : ClientWebSocketSession, DefaultWebSocketSession by delegate {
|
||||
init {
|
||||
masking = true
|
||||
}
|
||||
}
|
||||
) : ClientWebSocketSession, DefaultWebSocketSession by delegate
|
||||
|
||||
internal class DelegatingClientWebSocketSession(
|
||||
override val call: HttpClientCall, session: WebSocketSession
|
||||
) : ClientWebSocketSession, WebSocketSession by session
|
||||
@@ -0,0 +1,6 @@
|
||||
package io.ktor.client.features.websocket
|
||||
|
||||
import io.ktor.client.*
|
||||
import io.ktor.client.call.*
|
||||
|
||||
internal class WebSocketCall(client: HttpClient) : HttpClientCall(client)
|
||||
@@ -4,17 +4,14 @@ import io.ktor.client.request.*
|
||||
import io.ktor.http.*
|
||||
import io.ktor.http.websocket.*
|
||||
import io.ktor.util.*
|
||||
import java.util.*
|
||||
|
||||
private const val WEBSOCKET_VERSION = "13"
|
||||
private const val NONCE_SIZE = 16
|
||||
|
||||
class WebSocketContent: ClientUpgradeContent() {
|
||||
|
||||
internal class WebSocketContent : ClientUpgradeContent() {
|
||||
private val nonce: String = buildString {
|
||||
val bytes = ByteArray(NONCE_SIZE)
|
||||
random.nextBytes(bytes)
|
||||
append(encodeBase64(bytes))
|
||||
val nonce = generateNonce(NONCE_SIZE)
|
||||
append(nonce.encodeBase64())
|
||||
}
|
||||
|
||||
override val headers: Headers = HeadersBuilder().apply {
|
||||
@@ -27,15 +24,11 @@ class WebSocketContent: ClientUpgradeContent() {
|
||||
|
||||
override fun verify(headers: Headers) {
|
||||
val serverAccept = headers[HttpHeaders.SecWebSocketAccept]
|
||||
?: error("Server should specify header ${HttpHeaders.SecWebSocketAccept}")
|
||||
?: error("Server should specify header ${HttpHeaders.SecWebSocketAccept}")
|
||||
|
||||
val expectedAccept = websocketServerAccept(nonce)
|
||||
check(expectedAccept == serverAccept) {
|
||||
"Failed to verify server accept header. Expected: $expectedAccept, received: $serverAccept"
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
private val random = Random()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
package io.ktor.client.features.websocket
|
||||
|
||||
import io.ktor.client.request.*
|
||||
import kotlinx.coroutines.*
|
||||
|
||||
internal expect fun findWebSocketEngine(): WebSocketEngine
|
||||
|
||||
/**
|
||||
* Client engine implementing WebSocket protocol.
|
||||
* RFC: https://tools.ietf.org/html/rfc6455
|
||||
*/
|
||||
interface WebSocketEngine : CoroutineScope {
|
||||
/**
|
||||
* Execute WebSocket protocol [request].
|
||||
*/
|
||||
suspend fun execute(request: HttpRequest): WebSocketResponse
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
package io.ktor.client.features.websocket
|
||||
|
||||
import io.ktor.client.call.*
|
||||
import io.ktor.client.response.*
|
||||
import io.ktor.http.*
|
||||
import io.ktor.http.cio.websocket.*
|
||||
import io.ktor.util.date.*
|
||||
import kotlinx.coroutines.io.*
|
||||
import kotlin.coroutines.*
|
||||
|
||||
/**
|
||||
* Response produced by [WebSocketEngine].
|
||||
*
|
||||
* @property session - connection [WebSocketSession].
|
||||
*/
|
||||
class WebSocketResponse(
|
||||
override val coroutineContext: CoroutineContext,
|
||||
override val requestTime: GMTDate,
|
||||
val session: WebSocketSession,
|
||||
override val headers: Headers = Headers.Empty,
|
||||
override val status: HttpStatusCode = HttpStatusCode.SwitchingProtocols,
|
||||
override val version: HttpProtocolVersion = HttpProtocolVersion.HTTP_1_1
|
||||
) : HttpResponse {
|
||||
|
||||
override lateinit var call: HttpClientCall
|
||||
internal set
|
||||
|
||||
override val responseTime: GMTDate = GMTDate()
|
||||
|
||||
override val content: ByteReadChannel
|
||||
get() = throw WebSocketException(
|
||||
"Bytes from [content] is not available in [WebSocketResponse]. Consider using [session] instead."
|
||||
)
|
||||
}
|
||||
@@ -0,0 +1,93 @@
|
||||
package io.ktor.client.features.websocket
|
||||
|
||||
|
||||
import io.ktor.client.*
|
||||
import io.ktor.client.features.*
|
||||
import io.ktor.client.request.*
|
||||
import io.ktor.client.response.*
|
||||
import io.ktor.http.*
|
||||
import io.ktor.http.cio.websocket.*
|
||||
import io.ktor.util.*
|
||||
import io.ktor.util.pipeline.*
|
||||
|
||||
/**
|
||||
* Client WebSocket feature.
|
||||
*
|
||||
* @property pingInterval - interval between [FrameType.PING] messages.
|
||||
* @property maxFrameSize - max size of single websocket frame.
|
||||
*/
|
||||
@KtorExperimentalAPI
|
||||
@UseExperimental(WebSocketInternalAPI::class)
|
||||
class WebSockets(
|
||||
val pingInterval: Long = -1L,
|
||||
val maxFrameSize: Long = Int.MAX_VALUE.toLong()
|
||||
) {
|
||||
internal val engine: WebSocketEngine by lazy { findWebSocketEngine() }
|
||||
|
||||
private suspend fun execute(client: HttpClient, content: HttpRequestData): WebSocketCall {
|
||||
val clientEngine = client.engine
|
||||
val currentEngine = if (clientEngine is WebSocketEngine) clientEngine else engine
|
||||
|
||||
val result = WebSocketCall(client)
|
||||
val request = DefaultHttpRequest(result, content)
|
||||
|
||||
val response = currentEngine.execute(request).apply {
|
||||
call = result
|
||||
}
|
||||
|
||||
result.response = response
|
||||
return result
|
||||
}
|
||||
|
||||
@Suppress("KDocMissingDocumentation")
|
||||
companion object Feature : HttpClientFeature<Unit, WebSockets> {
|
||||
override val key: AttributeKey<WebSockets> = AttributeKey("Websocket")
|
||||
|
||||
override fun prepare(block: Unit.() -> Unit): WebSockets = WebSockets()
|
||||
|
||||
override fun install(feature: WebSockets, scope: HttpClient) {
|
||||
scope.requestPipeline.intercept(HttpRequestPipeline.Render) { _ ->
|
||||
if (!context.url.protocol.isWebsocket()) return@intercept
|
||||
|
||||
proceedWith(WebSocketContent())
|
||||
}
|
||||
|
||||
val WebSocket = PipelinePhase("WebSocket")
|
||||
scope.sendPipeline.insertPhaseBefore(HttpSendPipeline.Engine, WebSocket)
|
||||
scope.sendPipeline.intercept(WebSocket) { content ->
|
||||
if (content !is WebSocketContent) return@intercept
|
||||
finish()
|
||||
|
||||
context.body = content
|
||||
val requestData = context.build()
|
||||
|
||||
proceedWith(feature.execute(scope, requestData))
|
||||
}
|
||||
|
||||
scope.responsePipeline.intercept(HttpResponsePipeline.Transform) { (info, response) ->
|
||||
if (response !is WebSocketResponse) return@intercept
|
||||
|
||||
with(feature) {
|
||||
val session = response.session
|
||||
val expected = info.type
|
||||
|
||||
if (expected == DefaultClientWebSocketSession::class) {
|
||||
val clientSession = DefaultClientWebSocketSession(context, session.asDefault())
|
||||
proceedWith(HttpResponseContainer(info, clientSession))
|
||||
return@intercept
|
||||
}
|
||||
|
||||
proceedWith(HttpResponseContainer(info, DelegatingClientWebSocketSession(context, session)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun WebSocketSession.asDefault(): DefaultWebSocketSession {
|
||||
if (this is DefaultWebSocketSession) return this
|
||||
return DefaultWebSocketSession(this, pingInterval, maxFrameSize)
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("KDocMissingDocumentation")
|
||||
class WebSocketException(message: String) : IllegalStateException(message)
|
||||
@@ -0,0 +1,65 @@
|
||||
package io.ktor.client.features.websocket
|
||||
|
||||
import io.ktor.client.*
|
||||
import io.ktor.client.features.*
|
||||
import io.ktor.client.request.*
|
||||
import io.ktor.http.*
|
||||
import io.ktor.http.cio.websocket.*
|
||||
import kotlinx.coroutines.*
|
||||
|
||||
/**
|
||||
* Open [DefaultClientWebSocketSession].
|
||||
*/
|
||||
@UseExperimental(WebSocketInternalAPI::class)
|
||||
suspend fun HttpClient.webSocketSession(
|
||||
method: HttpMethod = HttpMethod.Get, host: String = "localhost", port: Int = DEFAULT_PORT, path: String = "/",
|
||||
block: HttpRequestBuilder.() -> Unit = {}
|
||||
): DefaultClientWebSocketSession = request {
|
||||
this.method = method
|
||||
url("ws", host, port, path)
|
||||
block()
|
||||
}
|
||||
|
||||
/**
|
||||
* Open [block] with [DefaultClientWebSocketSession].
|
||||
*/
|
||||
suspend fun HttpClient.webSocket(
|
||||
method: HttpMethod = HttpMethod.Get, host: String = "localhost", port: Int = DEFAULT_PORT, path: String = "/",
|
||||
request: HttpRequestBuilder.() -> Unit = {}, block: suspend DefaultClientWebSocketSession.() -> Unit
|
||||
): Unit {
|
||||
val session = webSocketSession(method, host, port, path) {
|
||||
url.protocol = URLProtocol.WS
|
||||
url.port = port
|
||||
request()
|
||||
}
|
||||
|
||||
try {
|
||||
session.block()
|
||||
} catch (cause: Throwable) {
|
||||
session.close(cause)
|
||||
throw cause
|
||||
} finally {
|
||||
session.close(null)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Open [DefaultClientWebSocketSession].
|
||||
*/
|
||||
suspend fun HttpClient.ws(
|
||||
method: HttpMethod = HttpMethod.Get, host: String = "localhost", port: Int = DEFAULT_PORT, path: String = "/",
|
||||
request: HttpRequestBuilder.() -> Unit = {}, block: suspend DefaultClientWebSocketSession.() -> Unit
|
||||
): Unit = webSocket(method, host, port, path, request, block)
|
||||
|
||||
/**
|
||||
* Open secure [DefaultClientWebSocketSession].
|
||||
*/
|
||||
suspend fun HttpClient.wss(
|
||||
method: HttpMethod = HttpMethod.Get, host: String = "localhost", port: Int = DEFAULT_PORT, path: String = "/",
|
||||
request: HttpRequestBuilder.() -> Unit = {}, block: suspend DefaultClientWebSocketSession.() -> Unit
|
||||
): Unit = webSocket(method, host, port, path, request = {
|
||||
url.protocol = URLProtocol.WSS
|
||||
url.port = port
|
||||
|
||||
request()
|
||||
}, block = block)
|
||||
@@ -0,0 +1,59 @@
|
||||
package io.ktor.client.features.websocket
|
||||
|
||||
import io.ktor.client.tests.utils.*
|
||||
import io.ktor.http.cio.websocket.*
|
||||
import kotlinx.io.core.*
|
||||
import kotlin.test.*
|
||||
|
||||
class WebSocketRemoteTest {
|
||||
|
||||
@Test
|
||||
fun testRemotePingPong() = clientsTest {
|
||||
val remote = "echo.websocket.org"
|
||||
|
||||
config {
|
||||
install(WebSockets)
|
||||
}
|
||||
|
||||
test { client ->
|
||||
client.ws(host = remote) {
|
||||
repeat(10) {
|
||||
ping(it.toString())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testSecureRemotePingPong() = clientsTest {
|
||||
val remote = "echo.websocket.org"
|
||||
|
||||
config {
|
||||
install(WebSockets)
|
||||
}
|
||||
|
||||
test { client ->
|
||||
client.wss(host = remote) {
|
||||
repeat(10) {
|
||||
ping(it.toString())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun WebSocketSession.ping(salt: String) {
|
||||
outgoing.send(Frame.Text("text: $salt"))
|
||||
val frame = incoming.receive()
|
||||
check(frame is Frame.Text)
|
||||
|
||||
assertEquals("text: $salt", frame.readText())
|
||||
|
||||
val data = "text: $salt".toByteArray()
|
||||
outgoing.send(Frame.Binary(true, data))
|
||||
val binaryFrame = incoming.receive()
|
||||
check(binaryFrame is Frame.Binary)
|
||||
|
||||
val buffer = binaryFrame.data
|
||||
assertEquals(data.contentToString(), buffer.contentToString())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,98 @@
|
||||
package io.ktor.client.features.websocket
|
||||
|
||||
import io.ktor.http.cio.websocket.*
|
||||
import io.ktor.util.*
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.channels.*
|
||||
import kotlinx.io.core.*
|
||||
import org.khronos.webgl.*
|
||||
import org.w3c.dom.*
|
||||
import kotlin.coroutines.*
|
||||
|
||||
internal class JsWebSocketSession(
|
||||
override val coroutineContext: CoroutineContext,
|
||||
private val websocket: WebSocket
|
||||
) : DefaultWebSocketSession {
|
||||
private val _closeReason: CompletableDeferred<CloseReason> = CompletableDeferred()
|
||||
private val _incoming: Channel<Frame> = Channel(Channel.UNLIMITED)
|
||||
private val _outgoing: Channel<Frame> = Channel(Channel.UNLIMITED)
|
||||
|
||||
override val incoming: ReceiveChannel<Frame> = _incoming
|
||||
override val outgoing: SendChannel<Frame> = _outgoing
|
||||
|
||||
override val closeReason: Deferred<CloseReason?> = _closeReason
|
||||
|
||||
init {
|
||||
websocket.binaryType = BinaryType.ARRAYBUFFER
|
||||
|
||||
websocket.onmessage = { event ->
|
||||
launch {
|
||||
val data = event.data
|
||||
|
||||
val frame: Frame = when (data) {
|
||||
is ArrayBuffer -> Frame.Binary(false, Int8Array(data) as ByteArray)
|
||||
is String -> Frame.Text(event.data as String)
|
||||
else -> error("Unknown frame type: ${event.type}")
|
||||
}
|
||||
|
||||
_incoming.offer(frame)
|
||||
}
|
||||
}
|
||||
|
||||
websocket.onerror = {
|
||||
_incoming.close(WebSocketException("$it"))
|
||||
_outgoing.cancel()
|
||||
}
|
||||
|
||||
websocket.onclose = {
|
||||
launch {
|
||||
val event = it as CloseEvent
|
||||
_incoming.send(Frame.Close(CloseReason(event.code, event.reason)))
|
||||
_incoming.close()
|
||||
|
||||
_outgoing.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
launch {
|
||||
_outgoing.consumeEach {
|
||||
when (it.frameType) {
|
||||
FrameType.TEXT -> {
|
||||
val text = it.data
|
||||
websocket.send(String(text))
|
||||
}
|
||||
FrameType.BINARY -> {
|
||||
val source = it.data as Int8Array
|
||||
val frameData = source.buffer.slice(
|
||||
source.byteOffset, source.byteOffset + source.byteLength
|
||||
)
|
||||
|
||||
websocket.send(frameData)
|
||||
}
|
||||
FrameType.CLOSE -> {
|
||||
val data = buildPacket { it.data }
|
||||
websocket.close(data.readShort(), data.readText())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun flush() {
|
||||
}
|
||||
|
||||
override fun terminate() {
|
||||
_incoming.cancel()
|
||||
_outgoing.cancel()
|
||||
websocket.close()
|
||||
}
|
||||
|
||||
@KtorExperimentalAPI
|
||||
override suspend fun close(cause: Throwable?) {
|
||||
val reason = cause?.let {
|
||||
CloseReason(CloseReason.Codes.UNEXPECTED_CONDITION, cause.message ?: "")
|
||||
} ?: CloseReason(CloseReason.Codes.NORMAL, "OK")
|
||||
|
||||
_incoming.send(Frame.Close(reason))
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
package io.ktor.client.features.websocket
|
||||
|
||||
import io.ktor.client.request.*
|
||||
import io.ktor.util.date.*
|
||||
import kotlinx.coroutines.*
|
||||
import org.w3c.dom.*
|
||||
import kotlin.coroutines.*
|
||||
|
||||
internal actual fun findWebSocketEngine(): WebSocketEngine = DefaultJsWebSocketEngine()
|
||||
|
||||
internal class DefaultJsWebSocketEngine() : WebSocketEngine {
|
||||
override val coroutineContext: CoroutineContext = Dispatchers.Default
|
||||
|
||||
override suspend fun execute(request: HttpRequest): WebSocketResponse {
|
||||
val requestTime = GMTDate()
|
||||
val callContext = CompletableDeferred<Unit>() + coroutineContext
|
||||
|
||||
val socket = WebSocket(request.url.toString()).apply { await() }
|
||||
val session = JsWebSocketSession(callContext, socket)
|
||||
|
||||
return WebSocketResponse(callContext, requestTime, session)
|
||||
}
|
||||
|
||||
private suspend fun WebSocket.await(): Unit = suspendCancellableCoroutine { continuation ->
|
||||
onopen = {
|
||||
onopen = undefined
|
||||
onerror = undefined
|
||||
continuation.resume(Unit)
|
||||
}
|
||||
onerror = {
|
||||
continuation.resumeWithException(WebSocketException("$it"))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package io.ktor.client.features.websocket
|
||||
|
||||
internal actual fun findWebSocketEngine(): WebSocketEngine = error(
|
||||
"Failed to find WebSocket client engine implementation in the classpath: consider adding WebSocketEngine dependency."
|
||||
)
|
||||
@@ -1,65 +0,0 @@
|
||||
package io.ktor.client.features.websocket
|
||||
|
||||
import io.ktor.client.*
|
||||
import io.ktor.client.call.*
|
||||
import io.ktor.client.features.*
|
||||
import io.ktor.client.request.*
|
||||
import io.ktor.client.response.*
|
||||
import io.ktor.http.*
|
||||
import io.ktor.http.cio.websocket.*
|
||||
import io.ktor.util.*
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.io.core.*
|
||||
import kotlin.reflect.full.*
|
||||
|
||||
/**
|
||||
* Client WebSocket feature.
|
||||
*/
|
||||
class WebSockets(
|
||||
val maxFrameSize: Long = Int.MAX_VALUE.toLong()
|
||||
) : Closeable {
|
||||
|
||||
@KtorExperimentalAPI
|
||||
val context = CompletableDeferred<Unit>()
|
||||
|
||||
override fun close() {
|
||||
context.complete(Unit)
|
||||
}
|
||||
|
||||
companion object Feature : HttpClientFeature<Unit, WebSockets> {
|
||||
override val key: AttributeKey<WebSockets> = AttributeKey("Websocket")
|
||||
|
||||
override fun prepare(block: Unit.() -> Unit): WebSockets = WebSockets()
|
||||
|
||||
override fun install(feature: WebSockets, scope: HttpClient) {
|
||||
scope.requestPipeline.intercept(HttpRequestPipeline.Render) { _ ->
|
||||
if (!context.url.protocol.isWebsocket()) return@intercept
|
||||
proceedWith(WebSocketContent())
|
||||
}
|
||||
|
||||
scope.responsePipeline.intercept(HttpResponsePipeline.Transform) { (info, response) ->
|
||||
val content = context.request.content
|
||||
|
||||
if (!info.type.isSubclassOf(WebSocketSession::class)
|
||||
|| response !is HttpResponse
|
||||
|| response.status.value != HttpStatusCode.SwitchingProtocols.value
|
||||
|| content !is WebSocketContent
|
||||
) return@intercept
|
||||
|
||||
content.verify(response.headers)
|
||||
|
||||
val raw = RawWebSocket(
|
||||
response.content, content.output,
|
||||
feature.maxFrameSize,
|
||||
coroutineContext = response.coroutineContext
|
||||
)
|
||||
|
||||
val session = object : ClientWebSocketSession, WebSocketSession by raw {
|
||||
override val call: HttpClientCall = response.call
|
||||
}
|
||||
|
||||
proceedWith(HttpResponseContainer(info, session))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,102 +0,0 @@
|
||||
package io.ktor.client.features.websocket
|
||||
|
||||
import io.ktor.client.*
|
||||
import io.ktor.client.features.*
|
||||
import io.ktor.client.request.*
|
||||
import io.ktor.http.*
|
||||
import io.ktor.http.cio.websocket.*
|
||||
import kotlinx.coroutines.*
|
||||
|
||||
suspend fun HttpClient.webSocketRawSession(
|
||||
method: HttpMethod = HttpMethod.Get, host: String = "localhost", port: Int = DEFAULT_PORT, path: String = "/",
|
||||
block: HttpRequestBuilder.() -> Unit = {}
|
||||
): ClientWebSocketSession = request {
|
||||
this.method = method
|
||||
url("ws", host, port, path)
|
||||
block()
|
||||
}
|
||||
|
||||
@UseExperimental(WebSocketInternalAPI::class)
|
||||
suspend fun HttpClient.webSocketSession(
|
||||
method: HttpMethod = HttpMethod.Get, host: String = "localhost", port: Int = DEFAULT_PORT, path: String = "/",
|
||||
block: HttpRequestBuilder.() -> Unit = {}
|
||||
): DefaultClientWebSocketSession {
|
||||
val feature = feature(WebSockets) ?: error("WebSockets feature should be installed")
|
||||
val session = webSocketRawSession(method, host, port, path, block)
|
||||
val origin = DefaultWebSocketSessionImpl(session)
|
||||
|
||||
feature.context.invokeOnCompletion {
|
||||
session.launch { origin.goingAway("Client is closed") }
|
||||
}
|
||||
|
||||
return DefaultClientWebSocketSession(session.call, origin)
|
||||
}
|
||||
|
||||
suspend fun HttpClient.webSocketRaw(
|
||||
method: HttpMethod = HttpMethod.Get, host: String = "localhost", port: Int = DEFAULT_PORT, path: String = "/",
|
||||
request: HttpRequestBuilder.() -> Unit = {}, block: suspend ClientWebSocketSession.() -> Unit
|
||||
): Unit {
|
||||
val session = webSocketRawSession(method, host, port, path) {
|
||||
url.protocol = URLProtocol.WS
|
||||
url.port = port
|
||||
|
||||
request()
|
||||
}
|
||||
|
||||
try {
|
||||
session.block()
|
||||
} catch (cause: Throwable) {
|
||||
session.close(cause)
|
||||
} finally {
|
||||
session.close()
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun HttpClient.webSocket(
|
||||
method: HttpMethod = HttpMethod.Get, host: String = "localhost", port: Int = DEFAULT_PORT, path: String = "/",
|
||||
request: HttpRequestBuilder.() -> Unit = {}, block: suspend DefaultClientWebSocketSession.() -> Unit
|
||||
): Unit {
|
||||
val session = webSocketSession(method, host, port, path) {
|
||||
url.protocol = URLProtocol.WS
|
||||
url.port = port
|
||||
request()
|
||||
}
|
||||
|
||||
try {
|
||||
session.block()
|
||||
} catch (cause: Throwable) {
|
||||
session.close(cause)
|
||||
} finally {
|
||||
session.close()
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun HttpClient.wsRaw(
|
||||
method: HttpMethod = HttpMethod.Get, host: String = "localhost", port: Int = DEFAULT_PORT, path: String = "/",
|
||||
request: HttpRequestBuilder.() -> Unit = {}, block: suspend ClientWebSocketSession.() -> Unit
|
||||
): Unit = webSocketRaw(method, host, port, path, request, block)
|
||||
|
||||
suspend fun HttpClient.wssRaw(
|
||||
method: HttpMethod = HttpMethod.Get, host: String = "localhost", port: Int = DEFAULT_PORT, path: String = "/",
|
||||
request: HttpRequestBuilder.() -> Unit = {}, block: suspend ClientWebSocketSession.() -> Unit
|
||||
): Unit = webSocketRaw(method, host, port, path, request = {
|
||||
url.protocol = URLProtocol.WSS
|
||||
url.port = port
|
||||
|
||||
request()
|
||||
}, block = block)
|
||||
|
||||
suspend fun HttpClient.ws(
|
||||
method: HttpMethod = HttpMethod.Get, host: String = "localhost", port: Int = DEFAULT_PORT, path: String = "/",
|
||||
request: HttpRequestBuilder.() -> Unit = {}, block: suspend DefaultClientWebSocketSession.() -> Unit
|
||||
): Unit = webSocket(method, host, port, path, request, block)
|
||||
|
||||
suspend fun HttpClient.wss(
|
||||
method: HttpMethod = HttpMethod.Get, host: String = "localhost", port: Int = DEFAULT_PORT, path: String = "/",
|
||||
request: HttpRequestBuilder.() -> Unit = {}, block: suspend DefaultClientWebSocketSession.() -> Unit
|
||||
): Unit = webSocket(method, host, port, path, request = {
|
||||
url.protocol = URLProtocol.WSS
|
||||
url.port = port
|
||||
|
||||
request()
|
||||
}, block = block)
|
||||
@@ -4,28 +4,16 @@ import io.ktor.application.*
|
||||
import io.ktor.client.engine.cio.*
|
||||
import io.ktor.client.tests.utils.*
|
||||
import io.ktor.http.cio.websocket.*
|
||||
import io.ktor.http.cio.websocket.Frame
|
||||
import io.ktor.routing.*
|
||||
import io.ktor.server.engine.*
|
||||
import io.ktor.server.netty.*
|
||||
import io.ktor.util.*
|
||||
import io.ktor.websocket.*
|
||||
import java.nio.*
|
||||
import kotlin.test.*
|
||||
|
||||
class WebSocketTest : TestWithKtor() {
|
||||
class WebSocketRawTest : TestWithKtor() {
|
||||
override val server: ApplicationEngine = embeddedServer(Netty, serverPort) {
|
||||
install(io.ktor.websocket.WebSockets)
|
||||
routing {
|
||||
webSocket("/ws") {
|
||||
for (frame in incoming) {
|
||||
when (frame) {
|
||||
is Frame.Text -> send(frame)
|
||||
is Frame.Binary -> send(frame)
|
||||
else -> assert(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
webSocketRaw("/rawEcho") {
|
||||
for (frame in incoming) {
|
||||
if (frame is Frame.Close) {
|
||||
@@ -61,57 +49,6 @@ class WebSocketTest : TestWithKtor() {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testPingPong() = clientTest(CIO) {
|
||||
config {
|
||||
install(WebSockets)
|
||||
}
|
||||
|
||||
test { client ->
|
||||
client.ws(port = serverPort, path = "ws") {
|
||||
assertTrue(masking)
|
||||
|
||||
repeat(10) {
|
||||
ping(it.toString())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testRemotePingPong(): Unit = clientTest(CIO) {
|
||||
val remote = "echo.websocket.org"
|
||||
|
||||
config {
|
||||
install(WebSockets)
|
||||
}
|
||||
|
||||
test { client ->
|
||||
client.ws(host = remote) {
|
||||
repeat(10) {
|
||||
ping(it.toString())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testSecureRemotePingPong(): Unit = clientTest(CIO) {
|
||||
val remote = "echo.websocket.org"
|
||||
|
||||
config {
|
||||
install(WebSockets)
|
||||
}
|
||||
|
||||
test { client ->
|
||||
client.wss(host = remote) {
|
||||
repeat(10) {
|
||||
ping(it.toString())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testConvenienceMethods() = clientTest(CIO) {
|
||||
config {
|
||||
@@ -141,20 +78,4 @@ class WebSocketTest : TestWithKtor() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun WebSocketSession.ping(salt: String) {
|
||||
outgoing.send(Frame.Text("text: $salt"))
|
||||
val frame = incoming.receive()
|
||||
assert(frame is Frame.Text)
|
||||
assertEquals("text: $salt", (frame as Frame.Text).readText())
|
||||
|
||||
val data = "text: $salt".toByteArray()
|
||||
outgoing.send(Frame.Binary(true, ByteBuffer.wrap(data)))
|
||||
val binaryFrame = incoming.receive()
|
||||
assert(binaryFrame is Frame.Binary)
|
||||
|
||||
val buffer = (binaryFrame as Frame.Binary).buffer
|
||||
val received = buffer.moveToByteArray()
|
||||
assertEquals(data.contentToString(), received.contentToString())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
package io.ktor.client.features.websocket
|
||||
|
||||
|
||||
internal actual fun findWebSocketEngine(): WebSocketEngine = error(
|
||||
"Failed to find WebSocketEngine. Consider adding [WebSocketEngine] implementation in dependencies."
|
||||
)
|
||||
@@ -1,12 +1,17 @@
|
||||
package io.ktor.client.engine.ios
|
||||
|
||||
import io.ktor.client.engine.*
|
||||
import platform.Foundation.*
|
||||
import kotlin.native.concurrent.*
|
||||
|
||||
@ThreadLocal
|
||||
private val initHook = Ios
|
||||
|
||||
/**
|
||||
* [HttpClientEngineFactory] using a [NSURLRequest] in implementation
|
||||
* with the the associated configuration [HttpClientEngineConfig].
|
||||
*/
|
||||
object Ios : HttpClientEngineFactory<IosClientEngineConfig> {
|
||||
|
||||
init {
|
||||
engines.add(this)
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
kotlin.sourceSets {
|
||||
jvmMain.dependencies {
|
||||
api project(':ktor-client:ktor-client-core')
|
||||
api 'com.squareup.okhttp3:okhttp:3.11.0'
|
||||
|
||||
api project(':ktor-client:ktor-client-features:ktor-client-websocket')
|
||||
api "com.squareup.okhttp3:okhttp:$okhttp_version"
|
||||
}
|
||||
jvmTest.dependencies {
|
||||
api project(':ktor-client:ktor-client-tests')
|
||||
|
||||
@@ -3,11 +3,16 @@ package io.ktor.client.engine.okhttp
|
||||
import io.ktor.client.*
|
||||
import io.ktor.client.engine.*
|
||||
|
||||
/**
|
||||
* [HttpClientEngineFactory] using a [OkHttp] based backend implementation
|
||||
* with the the associated configuration [OkHttpConfig].
|
||||
*/
|
||||
object OkHttp : HttpClientEngineFactory<OkHttpConfig> {
|
||||
override fun create(block: OkHttpConfig.() -> Unit): HttpClientEngine =
|
||||
OkHttpEngine(OkHttpConfig().apply(block))
|
||||
}
|
||||
|
||||
@Suppress("KDocMissingDocumentation")
|
||||
class OkHttpEngineContainer : HttpClientEngineContainer {
|
||||
override val factory: HttpClientEngineFactory<*> = OkHttp
|
||||
}
|
||||
|
||||
@@ -1,8 +1,13 @@
|
||||
package io.ktor.client.engine.okhttp
|
||||
|
||||
import io.ktor.client.call.*
|
||||
import io.ktor.client.engine.*
|
||||
import io.ktor.client.features.websocket.*
|
||||
import io.ktor.client.request.*
|
||||
import io.ktor.client.response.*
|
||||
import io.ktor.http.*
|
||||
import io.ktor.http.content.*
|
||||
import io.ktor.util.*
|
||||
import io.ktor.util.cio.*
|
||||
import io.ktor.util.date.*
|
||||
import kotlinx.coroutines.*
|
||||
@@ -11,29 +16,44 @@ import kotlinx.coroutines.io.jvm.javaio.*
|
||||
import okhttp3.*
|
||||
import kotlin.coroutines.*
|
||||
|
||||
class OkHttpEngine(override val config: OkHttpConfig) : HttpClientJvmEngine("ktor-okhttp") {
|
||||
@InternalAPI
|
||||
@Suppress("KDocMissingDocumentation")
|
||||
class OkHttpEngine(
|
||||
override val config: OkHttpConfig
|
||||
) : HttpClientJvmEngine("ktor-okhttp"), WebSocketEngine {
|
||||
|
||||
private val engine = OkHttpClient.Builder()
|
||||
.apply(config.config)
|
||||
.build()!!
|
||||
|
||||
override suspend fun execute(call: HttpClientCall, data: HttpRequestData): HttpEngineCall {
|
||||
val request = DefaultHttpRequest(call, data)
|
||||
val requestTime = GMTDate()
|
||||
val callContext = createCallContext()
|
||||
val engineRequest = request.convertToOkHttpRequest(callContext)
|
||||
val response = executeHttpRequest(engineRequest, callContext, call)
|
||||
|
||||
val builder = Request.Builder()
|
||||
return HttpEngineCall(request, response)
|
||||
}
|
||||
|
||||
with(builder) {
|
||||
url(request.url.toString())
|
||||
override suspend fun execute(request: HttpRequest): WebSocketResponse {
|
||||
check(request.url.protocol.isWebsocket())
|
||||
|
||||
mergeHeaders(request.headers, request.content) { key, value ->
|
||||
addHeader(key, value)
|
||||
}
|
||||
val callContext = createCallContext()
|
||||
val pingInterval = engine.pingIntervalMillis().toLong()
|
||||
val requestTime = GMTDate()
|
||||
val engineRequest = request.convertToOkHttpRequest(callContext)
|
||||
|
||||
method(request.method.value, request.content.convertToOkHttpBody(callContext))
|
||||
}
|
||||
val session = OkHttpWebsocketSession(engine, engineRequest, callContext)
|
||||
return WebSocketResponse(callContext, requestTime, session)
|
||||
}
|
||||
|
||||
val response = engine.execute(builder.build())
|
||||
private suspend fun executeHttpRequest(
|
||||
engineRequest: Request,
|
||||
callContext: CoroutineContext,
|
||||
call: HttpClientCall
|
||||
): HttpResponse {
|
||||
val requestTime = GMTDate()
|
||||
val response = engine.execute(engineRequest)
|
||||
|
||||
val body = response.body()
|
||||
callContext[Job]?.invokeOnCompletion { body?.close() }
|
||||
@@ -45,10 +65,26 @@ class OkHttpEngine(override val config: OkHttpConfig) : HttpClientJvmEngine("kto
|
||||
) ?: ByteReadChannel.Empty
|
||||
}
|
||||
|
||||
return HttpEngineCall(request, OkHttpResponse(response, call, requestTime, responseContent, callContext))
|
||||
return OkHttpResponse(response, call, requestTime, responseContent, callContext)
|
||||
}
|
||||
}
|
||||
|
||||
private fun HttpRequest.convertToOkHttpRequest(callContext: CoroutineContext): Request {
|
||||
val builder = Request.Builder()
|
||||
|
||||
with(builder) {
|
||||
url(url.toString())
|
||||
|
||||
mergeHeaders(headers, content) { key, value ->
|
||||
addHeader(key, value)
|
||||
}
|
||||
|
||||
method(method.value, content.convertToOkHttpBody(callContext))
|
||||
}
|
||||
|
||||
return builder.build()!!
|
||||
}
|
||||
|
||||
internal fun OutgoingContent.convertToOkHttpBody(callContext: CoroutineContext): RequestBody? = when (this) {
|
||||
is OutgoingContent.ByteArrayContent -> RequestBody.create(null, bytes())
|
||||
is OutgoingContent.ReadChannelContent -> StreamRequestBody(contentLength) { readFrom() }
|
||||
|
||||
@@ -0,0 +1,102 @@
|
||||
package io.ktor.client.engine.okhttp
|
||||
|
||||
import io.ktor.client.features.websocket.*
|
||||
import io.ktor.http.cio.websocket.*
|
||||
import io.ktor.util.*
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.channels.*
|
||||
import okhttp3.*
|
||||
import okio.*
|
||||
import kotlin.coroutines.*
|
||||
|
||||
internal class OkHttpWebsocketSession(
|
||||
private val engine: OkHttpClient,
|
||||
engineRequest: Request,
|
||||
override val coroutineContext: CoroutineContext
|
||||
) : DefaultWebSocketSession, WebSocketListener() {
|
||||
private val websocket: WebSocket = engine.newWebSocket(engineRequest, this)
|
||||
|
||||
override var pingIntervalMillis: Long
|
||||
get() = engine.pingIntervalMillis().toLong()
|
||||
set(_) = throw WebSocketException("OkHttp doesn't support dynamic ping interval. You could switch it in the engine configuration.")
|
||||
|
||||
override var timeoutMillis: Long
|
||||
get() = engine.readTimeoutMillis().toLong()
|
||||
set(_) = throw WebSocketException("Websocket timeout should be configured in OkHttpEngine.")
|
||||
|
||||
override var masking: Boolean
|
||||
get() = true
|
||||
set(_) = throw WebSocketException("Masking switch is not supported in OkHttp engine.")
|
||||
|
||||
override var maxFrameSize: Long
|
||||
get() = throw WebSocketException("OkHttp websocket doesn't support max frame size.")
|
||||
set(_) = throw WebSocketException("Websocket timeout should be configured in OkHttpEngine.")
|
||||
|
||||
private val _incoming = Channel<Frame>()
|
||||
private val _closeReason = CompletableDeferred<CloseReason?>()
|
||||
|
||||
override val incoming: ReceiveChannel<Frame>
|
||||
get() = _incoming
|
||||
|
||||
override val closeReason: Deferred<CloseReason?>
|
||||
get() = _closeReason
|
||||
|
||||
override val outgoing: SendChannel<Frame> = actor {
|
||||
try {
|
||||
for (frame in channel) {
|
||||
when (frame) {
|
||||
is Frame.Binary -> websocket.send(ByteString.of(frame.data, 0, frame.data.size))
|
||||
is Frame.Text -> websocket.send(String(frame.data))
|
||||
is Frame.Close -> {
|
||||
val reason = frame.readReason()!!
|
||||
websocket.close(reason.code.toInt(), reason.message)
|
||||
return@actor
|
||||
}
|
||||
else -> throw UnsupportedFrameTypeException(frame)
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
websocket.close(CloseReason.Codes.UNEXPECTED_CONDITION.code.toInt(), "Client failure")
|
||||
}
|
||||
}
|
||||
|
||||
override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
|
||||
super.onMessage(webSocket, bytes)
|
||||
_incoming.sendBlocking(Frame.Binary(true, bytes.toByteArray()))
|
||||
}
|
||||
|
||||
override fun onMessage(webSocket: WebSocket, text: String) {
|
||||
super.onMessage(webSocket, text)
|
||||
_incoming.sendBlocking(Frame.Text(true, text.toByteArray()))
|
||||
}
|
||||
|
||||
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
|
||||
super.onClosed(webSocket, code, reason)
|
||||
|
||||
_closeReason.complete(CloseReason(code.toShort(), reason))
|
||||
_incoming.close()
|
||||
outgoing.close()
|
||||
}
|
||||
|
||||
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
|
||||
super.onFailure(webSocket, t, response)
|
||||
|
||||
_incoming.close(t)
|
||||
outgoing.close(t)
|
||||
}
|
||||
|
||||
override suspend fun flush() {
|
||||
}
|
||||
|
||||
override fun terminate() {
|
||||
coroutineContext.cancel()
|
||||
}
|
||||
|
||||
@KtorExperimentalAPI
|
||||
override suspend fun close(cause: Throwable?) {
|
||||
outgoing.close(cause)
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("KDocMissingDocumentation")
|
||||
class UnsupportedFrameTypeException(frame: Frame) : IllegalArgumentException("Unsupported frame type: $frame")
|
||||
@@ -3,8 +3,11 @@ description = 'Common tests for client'
|
||||
apply plugin: "kotlinx-serialization"
|
||||
|
||||
kotlin.sourceSets {
|
||||
jvmMain.dependencies {
|
||||
commonMain.dependencies {
|
||||
api project(':ktor-client:ktor-client-core')
|
||||
api project(':ktor-client:ktor-client-tests:ktor-client-tests-dispatcher')
|
||||
}
|
||||
jvmMain.dependencies {
|
||||
api project(':ktor-server:ktor-server-jetty')
|
||||
api project(':ktor-server:ktor-server-netty')
|
||||
api group: 'ch.qos.logback', name: 'logback-classic', version: logback_version
|
||||
@@ -15,5 +18,17 @@ kotlin.sourceSets {
|
||||
runtimeOnly project(':ktor-client:ktor-client-apache')
|
||||
runtimeOnly project(':ktor-client:ktor-client-cio')
|
||||
}
|
||||
}
|
||||
if (!project.ext.ideaActive) {
|
||||
configure([linuxX64Test, mingwX64Test, macosX64Test]) {
|
||||
dependencies {
|
||||
api project(':ktor-client:ktor-client-curl')
|
||||
}
|
||||
}
|
||||
|
||||
configure([iosX64Test, iosArm32Test, iosArm64Test, macosX64Test]) {
|
||||
dependencies {
|
||||
api project(':ktor-client:ktor-client-ios')
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,74 @@
|
||||
package io.ktor.client.tests.utils
|
||||
|
||||
import io.ktor.client.*
|
||||
import io.ktor.client.engine.*
|
||||
import io.ktor.client.tests.utils.dispatcher.*
|
||||
import io.ktor.util.*
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.io.core.*
|
||||
|
||||
/**
|
||||
* Perform test against all clients from dependencies.
|
||||
*/
|
||||
expect fun clientsTest(
|
||||
block: suspend TestClientBuilder<HttpClientEngineConfig>.() -> Unit
|
||||
)
|
||||
|
||||
/**
|
||||
* Perform test with selected client [engine].
|
||||
*/
|
||||
fun clientTest(
|
||||
engine: HttpClientEngine,
|
||||
block: suspend TestClientBuilder<*>.() -> Unit
|
||||
) = clientTest(HttpClient(engine), block)
|
||||
|
||||
/**
|
||||
* Perform test with selected [client] or client loaded by service loader.
|
||||
*/
|
||||
fun clientTest(
|
||||
client: HttpClient = HttpClient(),
|
||||
block: suspend TestClientBuilder<HttpClientEngineConfig>.() -> Unit
|
||||
) = testSuspend {
|
||||
val builder = TestClientBuilder<HttpClientEngineConfig>().also { it.block() }
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
client
|
||||
.config { builder.config(this as HttpClientConfig<HttpClientEngineConfig>) }
|
||||
.use { client -> builder.test(client) }
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform test with selected client engine [factory].
|
||||
*/
|
||||
fun <T : HttpClientEngineConfig> clientTest(
|
||||
factory: HttpClientEngineFactory<T>,
|
||||
block: suspend TestClientBuilder<T>.() -> Unit
|
||||
) = testSuspend {
|
||||
val builder = TestClientBuilder<T>().apply { block() }
|
||||
val client = HttpClient(factory, block = builder.config)
|
||||
|
||||
client.use {
|
||||
builder.test(it)
|
||||
}
|
||||
|
||||
client.coroutineContext[Job]!!.join()
|
||||
}
|
||||
|
||||
@InternalAPI
|
||||
@Suppress("KDocMissingDocumentation")
|
||||
class TestClientBuilder<T : HttpClientEngineConfig>(
|
||||
var config: HttpClientConfig<T>.() -> Unit = {},
|
||||
var test: suspend (client: HttpClient) -> Unit = {}
|
||||
)
|
||||
|
||||
@InternalAPI
|
||||
@Suppress("KDocMissingDocumentation")
|
||||
fun <T : HttpClientEngineConfig> TestClientBuilder<T>.config(block: HttpClientConfig<T>.() -> Unit): Unit {
|
||||
config = block
|
||||
}
|
||||
|
||||
@InternalAPI
|
||||
@Suppress("KDocMissingDocumentation")
|
||||
fun TestClientBuilder<*>.test(block: suspend (client: HttpClient) -> Unit): Unit {
|
||||
test = block
|
||||
}
|
||||
@@ -4,7 +4,6 @@ import io.ktor.client.*
|
||||
import kotlin.test.*
|
||||
|
||||
class DefaultEngineTest {
|
||||
|
||||
@Test
|
||||
fun instantiationTest() {
|
||||
val client = HttpClient()
|
||||
@@ -0,0 +1,10 @@
|
||||
package io.ktor.client.tests.utils
|
||||
|
||||
import io.ktor.client.engine.*
|
||||
|
||||
/**
|
||||
* Perform test against all clients from dependencies.
|
||||
*/
|
||||
actual fun clientsTest(
|
||||
block: suspend TestClientBuilder<HttpClientEngineConfig>.() -> Unit
|
||||
) = clientTest(block = block)
|
||||
@@ -2,48 +2,19 @@ package io.ktor.client.tests.utils
|
||||
|
||||
import io.ktor.client.*
|
||||
import io.ktor.client.engine.*
|
||||
import kotlinx.coroutines.*
|
||||
import java.util.*
|
||||
|
||||
fun <T : HttpClientEngineConfig> clientTest(
|
||||
factory: HttpClientEngineFactory<T>,
|
||||
block: suspend TestClientBuilder<T>.() -> Unit
|
||||
): Unit = runBlocking {
|
||||
val builder = TestClientBuilder<T>().apply { block() }
|
||||
val client = HttpClient(factory, block = builder.config)
|
||||
|
||||
client.use {
|
||||
builder.test(it)
|
||||
/**
|
||||
* Perform test against all clients from dependencies.
|
||||
*/
|
||||
actual fun clientsTest(
|
||||
block: suspend TestClientBuilder<HttpClientEngineConfig>.() -> Unit
|
||||
): Unit {
|
||||
val engines: List<HttpClientEngineContainer> = HttpClientEngineContainer::class.java.let {
|
||||
ServiceLoader.load(it, it.classLoader).toList()
|
||||
}
|
||||
|
||||
client.coroutineContext[Job]!!.join()
|
||||
}
|
||||
|
||||
fun clientTest(
|
||||
engine: HttpClientEngine,
|
||||
block: suspend TestClientBuilder<*>.() -> Unit
|
||||
): Unit = clientTest(HttpClient(engine), block)
|
||||
|
||||
fun clientTest(
|
||||
client: HttpClient = HttpClient(),
|
||||
block: suspend TestClientBuilder<HttpClientEngineConfig>.() -> Unit
|
||||
): Unit = runBlocking {
|
||||
val builder = TestClientBuilder<HttpClientEngineConfig>().also { it.block() }
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
client
|
||||
.config { builder.config(this as HttpClientConfig<HttpClientEngineConfig>) }
|
||||
.use { client -> builder.test(client) }
|
||||
}
|
||||
|
||||
class TestClientBuilder<T : HttpClientEngineConfig>(
|
||||
var config: HttpClientConfig<T>.() -> Unit = {},
|
||||
var test: suspend (client: HttpClient) -> Unit = {}
|
||||
)
|
||||
|
||||
fun <T : HttpClientEngineConfig> TestClientBuilder<T>.config(block: HttpClientConfig<T>.() -> Unit): Unit {
|
||||
config = block
|
||||
}
|
||||
|
||||
fun TestClientBuilder<*>.test(block: suspend (client: HttpClient) -> Unit): Unit {
|
||||
test = block
|
||||
engines.forEach {
|
||||
clientTest(it.factory, block)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
package io.ktor.client.tests.utils.dispatcher
|
||||
|
||||
import kotlinx.coroutines.*
|
||||
import kotlin.coroutines.*
|
||||
|
||||
/**
|
||||
* Test runner for common suspend tests.
|
||||
*/
|
||||
@Suppress("NO_ACTUAL_FOR_EXPECT")
|
||||
expect fun testSuspend(
|
||||
context: CoroutineContext = EmptyCoroutineContext,
|
||||
block: suspend CoroutineScope.() -> Unit
|
||||
)
|
||||
@@ -0,0 +1,25 @@
|
||||
@file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
|
||||
|
||||
package io.ktor.client.tests.utils.dispatcher
|
||||
|
||||
import kotlinx.coroutines.*
|
||||
import kotlin.coroutines.*
|
||||
import platform.Foundation.*
|
||||
|
||||
/**
|
||||
* Test runner for native suspend tests.
|
||||
*/
|
||||
actual fun testSuspend(
|
||||
context: CoroutineContext,
|
||||
block: suspend CoroutineScope.() -> Unit
|
||||
): Unit = runBlocking {
|
||||
val loop = coroutineContext[ContinuationInterceptor] as EventLoop
|
||||
|
||||
val task = launch { block() }
|
||||
while (!task.isCompleted) {
|
||||
val date = NSDate().addTimeInterval(1.0) as NSDate
|
||||
NSRunLoop.mainRunLoop.runUntilDate(date)
|
||||
|
||||
loop.processNextEvent()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
@file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
|
||||
|
||||
package io.ktor.client.tests.utils.dispatcher
|
||||
|
||||
import kotlinx.coroutines.*
|
||||
import kotlin.coroutines.*
|
||||
import platform.Foundation.*
|
||||
|
||||
/**
|
||||
* Test runner for native suspend tests.
|
||||
*/
|
||||
actual fun testSuspend(
|
||||
context: CoroutineContext,
|
||||
block: suspend CoroutineScope.() -> Unit
|
||||
): Unit = runBlocking {
|
||||
val loop = coroutineContext[ContinuationInterceptor] as EventLoop
|
||||
|
||||
val task = launch { block() }
|
||||
while (!task.isCompleted) {
|
||||
val date = NSDate().addTimeInterval(1.0) as NSDate
|
||||
NSRunLoop.mainRunLoop.runUntilDate(date)
|
||||
|
||||
loop.processNextEvent()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
@file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
|
||||
|
||||
package io.ktor.client.tests.utils.dispatcher
|
||||
|
||||
import kotlinx.coroutines.*
|
||||
import kotlin.coroutines.*
|
||||
import platform.Foundation.*
|
||||
|
||||
/**
|
||||
* Test runner for native suspend tests.
|
||||
*/
|
||||
actual fun testSuspend(
|
||||
context: CoroutineContext,
|
||||
block: suspend CoroutineScope.() -> Unit
|
||||
): Unit = runBlocking {
|
||||
val loop = coroutineContext[ContinuationInterceptor] as EventLoop
|
||||
|
||||
val task = launch { block() }
|
||||
while (!task.isCompleted) {
|
||||
val date = NSDate().addTimeInterval(1.0) as NSDate
|
||||
NSRunLoop.mainRunLoop.runUntilDate(date)
|
||||
|
||||
loop.processNextEvent()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
package io.ktor.client.tests.utils.dispatcher
|
||||
|
||||
import kotlinx.coroutines.*
|
||||
import kotlin.coroutines.*
|
||||
|
||||
/**
|
||||
* Test runner for js suspend tests.
|
||||
*/
|
||||
actual fun testSuspend(
|
||||
context: CoroutineContext,
|
||||
block: suspend CoroutineScope.() -> Unit
|
||||
): dynamic = GlobalScope.promise(block = block, context = context)
|
||||
@@ -0,0 +1,12 @@
|
||||
package io.ktor.client.tests.utils.dispatcher
|
||||
|
||||
import kotlinx.coroutines.*
|
||||
import kotlin.coroutines.*
|
||||
|
||||
/**
|
||||
* Test runner for jvm suspend tests.
|
||||
*/
|
||||
actual fun testSuspend(
|
||||
context: CoroutineContext,
|
||||
block: suspend CoroutineScope.() -> Unit
|
||||
): Unit = runBlocking(context, block)
|
||||
@@ -0,0 +1,12 @@
|
||||
package io.ktor.client.tests.utils.dispatcher
|
||||
|
||||
import kotlinx.coroutines.*
|
||||
import kotlin.coroutines.*
|
||||
|
||||
/**
|
||||
* Test runner for native suspend tests.
|
||||
*/
|
||||
actual fun testSuspend(
|
||||
context: CoroutineContext,
|
||||
block: suspend CoroutineScope.() -> Unit
|
||||
): Unit = runBlocking(context, block)
|
||||
@@ -0,0 +1,25 @@
|
||||
@file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
|
||||
|
||||
package io.ktor.client.tests.utils.dispatcher
|
||||
|
||||
import kotlinx.coroutines.*
|
||||
import kotlin.coroutines.*
|
||||
import platform.Foundation.*
|
||||
|
||||
/**
|
||||
* Test runner for native suspend tests.
|
||||
*/
|
||||
actual fun testSuspend(
|
||||
context: CoroutineContext,
|
||||
block: suspend CoroutineScope.() -> Unit
|
||||
): Unit = runBlocking {
|
||||
val loop = coroutineContext[ContinuationInterceptor] as EventLoop
|
||||
|
||||
val task = launch { block() }
|
||||
while (!task.isCompleted) {
|
||||
val date = NSDate().addTimeInterval(1.0) as NSDate
|
||||
NSRunLoop.mainRunLoop.runUntilDate(date)
|
||||
|
||||
loop.processNextEvent()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
package io.ktor.client.tests.utils.dispatcher
|
||||
|
||||
import kotlinx.coroutines.*
|
||||
import kotlin.coroutines.*
|
||||
|
||||
/**
|
||||
* Test runner for native suspend tests.
|
||||
*/
|
||||
actual fun testSuspend(
|
||||
context: CoroutineContext,
|
||||
block: suspend CoroutineScope.() -> Unit
|
||||
): Unit = runBlocking(context, block)
|
||||
@@ -0,0 +1,12 @@
|
||||
package io.ktor.client.tests.utils
|
||||
|
||||
import io.ktor.client.engine.*
|
||||
|
||||
/**
|
||||
* Perform test against all clients from dependencies.
|
||||
*/
|
||||
actual fun clientsTest(
|
||||
block: suspend TestClientBuilder<HttpClientEngineConfig>.() -> Unit
|
||||
) {
|
||||
engines.forEach { clientTest(it, block) }
|
||||
}
|
||||
@@ -1,16 +1,11 @@
|
||||
|
||||
subprojects {
|
||||
kotlin {
|
||||
sourceSets {
|
||||
commonMain {
|
||||
dependencies {
|
||||
api project(':ktor-server:ktor-server-core')
|
||||
}
|
||||
}
|
||||
commonTest {
|
||||
dependencies {
|
||||
api project(":ktor-server:ktor-server-test-host")
|
||||
}
|
||||
}
|
||||
kotlin.sourceSets {
|
||||
commonMain.dependencies {
|
||||
api project(':ktor-server:ktor-server-core')
|
||||
}
|
||||
commonTest.dependencies {
|
||||
api project(":ktor-server:ktor-server-test-host")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -138,7 +138,7 @@ private class WebSocketProtocolsSelector(
|
||||
) : RouteSelector(RouteSelectorEvaluation.qualityConstant) {
|
||||
override fun evaluate(context: RoutingResolveContext, segmentIndex: Int): RouteSelectorEvaluation {
|
||||
val protocols = context.call.request.headers[HttpHeaders.SecWebSocketProtocol]
|
||||
?: return RouteSelectorEvaluation.Failed
|
||||
?: return RouteSelectorEvaluation.Failed
|
||||
|
||||
if (requiredProtocol in parseHeaderValue(protocols).map { it.value }) {
|
||||
return RouteSelectorEvaluation.Constant
|
||||
|
||||
@@ -26,7 +26,9 @@ import java.util.concurrent.CancellationException
|
||||
import kotlin.test.*
|
||||
|
||||
@UseExperimental(WebSocketInternalAPI::class, ObsoleteCoroutinesApi::class)
|
||||
abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration : ApplicationEngine.Configuration>(hostFactory: ApplicationEngineFactory<TEngine, TConfiguration>) : EngineTestBase<TEngine, TConfiguration>(hostFactory) {
|
||||
abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration : ApplicationEngine.Configuration>(
|
||||
hostFactory: ApplicationEngineFactory<TEngine, TConfiguration>
|
||||
) : EngineTestBase<TEngine, TConfiguration>(hostFactory) {
|
||||
@get:Rule
|
||||
val errors = ErrorCollector()
|
||||
|
||||
@@ -57,7 +59,8 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
|
||||
socket {
|
||||
// send upgrade request
|
||||
outputStream.apply {
|
||||
write("""
|
||||
write(
|
||||
"""
|
||||
GET / HTTP/1.1
|
||||
Host: localhost:$port
|
||||
Upgrade: websocket
|
||||
@@ -66,7 +69,8 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
|
||||
Origin: http://localhost:$port
|
||||
Sec-WebSocket-Protocol: chat
|
||||
Sec-WebSocket-Version: 13
|
||||
""".trimIndent().replace("\n", "\r\n").toByteArray())
|
||||
""".trimIndent().replace("\n", "\r\n").toByteArray()
|
||||
)
|
||||
write("\r\n\r\n".toByteArray())
|
||||
flush()
|
||||
}
|
||||
@@ -114,7 +118,8 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
|
||||
socket {
|
||||
// send upgrade request
|
||||
outputStream.apply {
|
||||
write("""
|
||||
write(
|
||||
"""
|
||||
GET / HTTP/1.1
|
||||
Host: localhost:$port
|
||||
Upgrade: websocket
|
||||
@@ -123,7 +128,8 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
|
||||
Origin: http://localhost:$port
|
||||
Sec-WebSocket-Protocol: chat
|
||||
Sec-WebSocket-Version: 13
|
||||
""".trimIndent().replace("\n", "\r\n").toByteArray())
|
||||
""".trimIndent().replace("\n", "\r\n").toByteArray()
|
||||
)
|
||||
write("\r\n\r\n".toByteArray())
|
||||
flush()
|
||||
}
|
||||
@@ -189,7 +195,8 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
|
||||
socket {
|
||||
// send upgrade request
|
||||
outputStream.apply {
|
||||
write("""
|
||||
write(
|
||||
"""
|
||||
GET / HTTP/1.1
|
||||
Host: localhost:$port
|
||||
Upgrade: websocket
|
||||
@@ -198,7 +205,8 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
|
||||
Origin: http://localhost:$port
|
||||
Sec-WebSocket-Protocol: chat
|
||||
Sec-WebSocket-Version: 13
|
||||
""".trimIndent().replace("\n", "\r\n").toByteArray())
|
||||
""".trimIndent().replace("\n", "\r\n").toByteArray()
|
||||
)
|
||||
write("\r\n\r\n".toByteArray())
|
||||
flush()
|
||||
}
|
||||
@@ -250,7 +258,8 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
|
||||
socket {
|
||||
// send upgrade request
|
||||
outputStream.apply {
|
||||
write("""
|
||||
write(
|
||||
"""
|
||||
GET / HTTP/1.1
|
||||
Host: localhost:$port
|
||||
Upgrade: websocket
|
||||
@@ -259,7 +268,8 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
|
||||
Origin: http://localhost:$port
|
||||
Sec-WebSocket-Protocol: chat
|
||||
Sec-WebSocket-Version: 13
|
||||
""".trimIndent().replace("\n", "\r\n").toByteArray())
|
||||
""".trimIndent().replace("\n", "\r\n").toByteArray()
|
||||
)
|
||||
write("\r\n\r\n".toByteArray())
|
||||
flush()
|
||||
}
|
||||
@@ -317,7 +327,8 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
|
||||
socket {
|
||||
// send upgrade request
|
||||
outputStream.apply {
|
||||
write("""
|
||||
write(
|
||||
"""
|
||||
GET / HTTP/1.1
|
||||
Host: localhost:$port
|
||||
Upgrade: websocket
|
||||
@@ -326,7 +337,8 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
|
||||
Origin: http://localhost:$port
|
||||
Sec-WebSocket-Protocol: chat
|
||||
Sec-WebSocket-Version: 13
|
||||
""".trimIndent().replace("\n", "\r\n").toByteArray())
|
||||
""".trimIndent().replace("\n", "\r\n").toByteArray()
|
||||
)
|
||||
write("\r\n\r\n".toByteArray())
|
||||
flush()
|
||||
}
|
||||
@@ -399,7 +411,8 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
|
||||
socket {
|
||||
// send upgrade request
|
||||
outputStream.apply {
|
||||
write("""
|
||||
write(
|
||||
"""
|
||||
GET / HTTP/1.1
|
||||
Host: localhost:$port
|
||||
Upgrade: websocket
|
||||
@@ -408,7 +421,8 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
|
||||
Origin: http://localhost:$port
|
||||
Sec-WebSocket-Protocol: chat
|
||||
Sec-WebSocket-Version: 13
|
||||
""".trimIndent().replace("\n", "\r\n").toByteArray())
|
||||
""".trimIndent().replace("\n", "\r\n").toByteArray()
|
||||
)
|
||||
write("\r\n\r\n".toByteArray())
|
||||
flush()
|
||||
}
|
||||
@@ -462,7 +476,8 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
|
||||
socket {
|
||||
// send upgrade request
|
||||
outputStream.apply {
|
||||
write("""
|
||||
write(
|
||||
"""
|
||||
GET / HTTP/1.1
|
||||
Host: localhost:$port
|
||||
Upgrade: websocket
|
||||
@@ -471,7 +486,8 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
|
||||
Origin: http://localhost:$port
|
||||
Sec-WebSocket-Protocol: chat
|
||||
Sec-WebSocket-Version: 13
|
||||
""".trimIndent().replace("\n", "\r\n").toByteArray())
|
||||
""".trimIndent().replace("\n", "\r\n").toByteArray()
|
||||
)
|
||||
write("\r\n\r\n".toByteArray())
|
||||
flush()
|
||||
}
|
||||
@@ -503,8 +519,10 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
|
||||
}
|
||||
}
|
||||
|
||||
private fun Socket.assertCloseFrame(closeCode: Short = CloseReason.Codes.NORMAL.code,
|
||||
replyCloseFrame: Boolean = true) {
|
||||
private fun Socket.assertCloseFrame(
|
||||
closeCode: Short = CloseReason.Codes.NORMAL.code,
|
||||
replyCloseFrame: Boolean = true
|
||||
) {
|
||||
loop@
|
||||
while (true) {
|
||||
val frame = getInputStream().readFrame()
|
||||
@@ -529,9 +547,10 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
|
||||
val opcodeAndFin = readOrFail()
|
||||
val lenAndMask = readOrFail()
|
||||
|
||||
val frameType = FrameType[opcodeAndFin and 0x0f] ?: throw IllegalStateException("Wrong opcode ${opcodeAndFin and 0x0f}")
|
||||
val fin = (opcodeAndFin and 0x80) != 0
|
||||
val frameType = FrameType[opcodeAndFin and 0x0f]
|
||||
?: throw IllegalStateException("Wrong opcode ${opcodeAndFin and 0x0f}")
|
||||
|
||||
val fin = (opcodeAndFin and 0x80) != 0
|
||||
val len1 = lenAndMask and 0x7f
|
||||
val mask = (lenAndMask and 0x80) != 0
|
||||
|
||||
@@ -600,13 +619,13 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
|
||||
|
||||
private fun InputStream.readShortBE() = (readOrFail() shl 8) or readOrFail()
|
||||
private fun InputStream.readLongBE() = (readOrFail().toLong() shl 56) or
|
||||
(readOrFail().toLong() shl 48) or
|
||||
(readOrFail().toLong() shl 40) or
|
||||
(readOrFail().toLong() shl 32) or
|
||||
(readOrFail().toLong() shl 24) or
|
||||
(readOrFail().toLong() shl 16) or
|
||||
(readOrFail().toLong() shl 8) or
|
||||
readOrFail().toLong()
|
||||
(readOrFail().toLong() shl 48) or
|
||||
(readOrFail().toLong() shl 40) or
|
||||
(readOrFail().toLong() shl 32) or
|
||||
(readOrFail().toLong() shl 24) or
|
||||
(readOrFail().toLong() shl 16) or
|
||||
(readOrFail().toLong() shl 8) or
|
||||
readOrFail().toLong()
|
||||
|
||||
private fun InputStream.readFully(size: Int): ByteArray {
|
||||
val array = ByteArray(size)
|
||||
|
||||
@@ -19,6 +19,14 @@ fun String.encodeBase64(): String = buildPacket {
|
||||
writeStringUtf8(this@encodeBase64)
|
||||
}.encodeBase64()
|
||||
|
||||
/**
|
||||
* Encode [ByteArray] in base64 format
|
||||
*/
|
||||
@InternalAPI
|
||||
fun ByteArray.encodeBase64(): String = buildPacket {
|
||||
writeFully(this@encodeBase64)
|
||||
}.encodeBase64()
|
||||
|
||||
/**
|
||||
* Encode [ByteReadPacket] in base64 format
|
||||
*/
|
||||
|
||||
10
ktor-utils/common/src/io/ktor/util/Bytes.kt
Normal file
10
ktor-utils/common/src/io/ktor/util/Bytes.kt
Normal file
@@ -0,0 +1,10 @@
|
||||
package io.ktor.util
|
||||
|
||||
/**
|
||||
* Read [Short] with specified [offset] from [ByteArray].
|
||||
*/
|
||||
@InternalAPI
|
||||
fun ByteArray.readShort(offset: Int): Short {
|
||||
val result = ((this[offset].toInt() and 0xFF) shl 8) or (this[offset + 1].toInt() and 0xFF)
|
||||
return result.toShort()
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
@file:kotlin.jvm.JvmMultifileClass
|
||||
@file:kotlin.jvm.JvmName("CryptoKt")
|
||||
|
||||
package io.ktor.util
|
||||
|
||||
import kotlinx.io.charsets.*
|
||||
@@ -51,6 +52,12 @@ fun generateNonce(size: Int): ByteArray = buildPacket {
|
||||
}
|
||||
}.readBytes(size)
|
||||
|
||||
/**
|
||||
* Compute SHA-1 hash for the specified [bytes]
|
||||
*/
|
||||
@KtorExperimentalAPI
|
||||
expect fun sha1(bytes: ByteArray): ByteArray
|
||||
|
||||
@InternalAPI
|
||||
expect fun Digest(name: String): Digest
|
||||
|
||||
|
||||
@@ -40,3 +40,11 @@ private external object crypto {
|
||||
private external class SubtleCrypto {
|
||||
fun digest(algoName: String, buffer: ByteArray): Promise<ArrayBuffer>
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute SHA-1 hash for the specified [bytes]
|
||||
*/
|
||||
@KtorExperimentalAPI
|
||||
actual fun sha1(bytes: ByteArray): ByteArray {
|
||||
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
@file:kotlin.jvm.JvmMultifileClass
|
||||
@file:kotlin.jvm.JvmName("CryptoKt")
|
||||
|
||||
package io.ktor.util
|
||||
|
||||
import kotlinx.coroutines.*
|
||||
@@ -22,7 +23,11 @@ private fun getDigest(text: String, algorithm: String, salt: String): ByteArray
|
||||
* Compute SHA-1 hash for the specified [bytes]
|
||||
*/
|
||||
@KtorExperimentalAPI
|
||||
fun sha1(bytes: ByteArray): ByteArray = MessageDigest.getInstance("SHA1").digest(bytes)!!
|
||||
actual fun sha1(bytes: ByteArray): ByteArray = runBlocking {
|
||||
Digest("SHA1").also { it += bytes }.build()
|
||||
}
|
||||
|
||||
actual fun Digest(name: String): Digest = DigestImpl(MessageDigest.getInstance(name))
|
||||
|
||||
private inline class DigestImpl(val delegate: MessageDigest) : Digest {
|
||||
override fun plusAssign(bytes: ByteArray) {
|
||||
|
||||
@@ -1,5 +1,10 @@
|
||||
package = utils
|
||||
compilerOpts.mingw_x64 = -DMINGW
|
||||
compilerOpts.ios_x64 = -DIOS
|
||||
compilerOpts.ios_arm64 = -DIOS
|
||||
compilerOpts.ios_arm32 = -DIOS
|
||||
compilerOpts.macos_x64 = -DMACOS
|
||||
compilerOpts.linux_x64 = -DLINUX
|
||||
---
|
||||
#include <pthread.h>
|
||||
#include <time.h>
|
||||
|
||||
@@ -5,3 +5,11 @@ actual fun generateNonce(): String = error("[generateNonce] is not supported on
|
||||
|
||||
@InternalAPI
|
||||
actual fun Digest(name: String): Digest = error("[Digest] is not supported on iOS")
|
||||
|
||||
/**
|
||||
* Compute SHA-1 hash for the specified [bytes]
|
||||
*/
|
||||
@KtorExperimentalAPI
|
||||
actual fun sha1(bytes: ByteArray): ByteArray {
|
||||
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ includeEx ':ktor-server'
|
||||
includeEx ':ktor-client'
|
||||
includeEx ':ktor-client:ktor-client-core'
|
||||
includeEx ':ktor-client:ktor-client-tests'
|
||||
includeEx ':ktor-client:ktor-client-tests:ktor-client-tests-dispatcher'
|
||||
includeEx ':ktor-client:ktor-client-apache'
|
||||
includeEx ':ktor-client:ktor-client-android'
|
||||
includeEx ':ktor-client:ktor-client-cio'
|
||||
|
||||
Reference in New Issue
Block a user