From c9217e10088fb430aa14f63f32c1726161ce53dc Mon Sep 17 00:00:00 2001 From: Sergey Mashkov Date: Tue, 4 Feb 2020 20:57:44 +0300 Subject: [PATCH] Fix server upgrade completion --- .../ktor/server/cio/CIOApplicationResponse.kt | 2 +- .../http1/NettyHttp1ApplicationResponse.kt | 1 + .../io/ktor/server/servlet/ServletUpgrade.kt | 27 ++++++++++++++++--- 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/ktor-server/ktor-server-cio/jvm/src/io/ktor/server/cio/CIOApplicationResponse.kt b/ktor-server/ktor-server-cio/jvm/src/io/ktor/server/cio/CIOApplicationResponse.kt index c44f3f53f..7228105bd 100644 --- a/ktor-server/ktor-server-cio/jvm/src/io/ktor/server/cio/CIOApplicationResponse.kt +++ b/ktor-server/ktor-server-cio/jvm/src/io/ktor/server/cio/CIOApplicationResponse.kt @@ -89,7 +89,7 @@ internal class CIOApplicationResponse(call: CIOApplicationCall, sendResponseMessage(contentReady = false) val upgradedJob = upgrade.upgrade(input, output, engineDispatcher, userDispatcher) - upgradedJob.invokeOnCompletion { output.close() } + upgradedJob.invokeOnCompletion { output.close(); input.cancel() } upgradedJob.join() } diff --git a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationResponse.kt b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationResponse.kt index 1b81e6036..ebe282307 100644 --- a/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationResponse.kt +++ b/ktor-server/ktor-server-netty/jvm/src/io/ktor/server/netty/http1/NettyHttp1ApplicationResponse.kt @@ -96,6 +96,7 @@ internal class NettyHttp1ApplicationResponse(call: NettyApplicationCall, job.invokeOnCompletion { upgradedWriteChannel.close() bodyHandler.close() + upgradedReadChannel.cancel() } (call as NettyApplicationCall).responseWriteJob.join() diff --git a/ktor-server/ktor-server-servlet/jvm/src/io/ktor/server/servlet/ServletUpgrade.kt b/ktor-server/ktor-server-servlet/jvm/src/io/ktor/server/servlet/ServletUpgrade.kt index f4290750a..e21e27b0b 100644 --- a/ktor-server/ktor-server-servlet/jvm/src/io/ktor/server/servlet/ServletUpgrade.kt +++ b/ktor-server/ktor-server-servlet/jvm/src/io/ktor/server/servlet/ServletUpgrade.kt @@ -8,8 +8,10 @@ import io.ktor.http.content.* import io.ktor.server.engine.* import io.ktor.util.* import io.ktor.util.cio.* +import io.ktor.utils.io.* import kotlinx.coroutines.* import io.ktor.utils.io.jvm.javaio.* +import kotlinx.coroutines.CancellationException import javax.servlet.http.* import kotlin.coroutines.* @@ -68,13 +70,18 @@ class UpgradeRequest( private val ServletUpgradeCoroutineName = CoroutineName("servlet-upgrade") +// this class is instantiated by a servlet container +// so we can't pass [UpgradeRequest] through a constructor +// we also can't make it internal due to the same reason @InternalAPI @EngineAPI @Suppress("KDocMissingDocumentation") class ServletUpgradeHandler : HttpUpgradeHandler, CoroutineScope { @Volatile lateinit var up: UpgradeRequest - private val upgradeJob: CompletableJob = Job() + + @Volatile + lateinit var upgradeJob: CompletableJob override val coroutineContext: CoroutineContext get() = upgradeJob @@ -83,6 +90,7 @@ class ServletUpgradeHandler : HttpUpgradeHandler, CoroutineScope { throw IllegalArgumentException("Upgrade processing requires WebConnection instance") } + upgradeJob = Job(up.engineContext[Job]) upgradeJob.invokeOnCompletion { webConnection.close() } @@ -98,11 +106,24 @@ class ServletUpgradeHandler : HttpUpgradeHandler, CoroutineScope { val outputChannel = servletWriter(webConnection.outputStream).channel launch(up.userContext + ServletUpgradeCoroutineName, start = CoroutineStart.UNDISPATCHED) { - up.upgradeMessage.upgrade(inputChannel, outputChannel, up.engineContext, up.userContext) + val job = up.upgradeMessage.upgrade( + inputChannel, outputChannel, + up.engineContext + upgradeJob, up.userContext + upgradeJob + ) + + upgradeJob.complete() + job.invokeOnCompletion { + inputChannel.cancel() + outputChannel.close() + upgradeJob.cancel() + } } } override fun destroy() { - upgradeJob.completeExceptionally(CancellationException("Upgraded WebConnection destroyed")) + try { + upgradeJob.completeExceptionally(CancellationException("Upgraded WebConnection destroyed")) + } catch (_: Throwable) { + } } }