Fix server upgrade completion

This commit is contained in:
Sergey Mashkov
2020-02-04 20:57:44 +03:00
parent 1734027899
commit c9217e1008
3 changed files with 26 additions and 4 deletions

View File

@@ -89,7 +89,7 @@ internal class CIOApplicationResponse(call: CIOApplicationCall,
sendResponseMessage(contentReady = false)
val upgradedJob = upgrade.upgrade(input, output, engineDispatcher, userDispatcher)
upgradedJob.invokeOnCompletion { output.close() }
upgradedJob.invokeOnCompletion { output.close(); input.cancel() }
upgradedJob.join()
}

View File

@@ -96,6 +96,7 @@ internal class NettyHttp1ApplicationResponse(call: NettyApplicationCall,
job.invokeOnCompletion {
upgradedWriteChannel.close()
bodyHandler.close()
upgradedReadChannel.cancel()
}
(call as NettyApplicationCall).responseWriteJob.join()

View File

@@ -8,8 +8,10 @@ import io.ktor.http.content.*
import io.ktor.server.engine.*
import io.ktor.util.*
import io.ktor.util.cio.*
import io.ktor.utils.io.*
import kotlinx.coroutines.*
import io.ktor.utils.io.jvm.javaio.*
import kotlinx.coroutines.CancellationException
import javax.servlet.http.*
import kotlin.coroutines.*
@@ -68,13 +70,18 @@ class UpgradeRequest(
private val ServletUpgradeCoroutineName = CoroutineName("servlet-upgrade")
// this class is instantiated by a servlet container
// so we can't pass [UpgradeRequest] through a constructor
// we also can't make it internal due to the same reason
@InternalAPI
@EngineAPI
@Suppress("KDocMissingDocumentation")
class ServletUpgradeHandler : HttpUpgradeHandler, CoroutineScope {
@Volatile
lateinit var up: UpgradeRequest
private val upgradeJob: CompletableJob = Job()
@Volatile
lateinit var upgradeJob: CompletableJob
override val coroutineContext: CoroutineContext get() = upgradeJob
@@ -83,6 +90,7 @@ class ServletUpgradeHandler : HttpUpgradeHandler, CoroutineScope {
throw IllegalArgumentException("Upgrade processing requires WebConnection instance")
}
upgradeJob = Job(up.engineContext[Job])
upgradeJob.invokeOnCompletion {
webConnection.close()
}
@@ -98,11 +106,24 @@ class ServletUpgradeHandler : HttpUpgradeHandler, CoroutineScope {
val outputChannel = servletWriter(webConnection.outputStream).channel
launch(up.userContext + ServletUpgradeCoroutineName, start = CoroutineStart.UNDISPATCHED) {
up.upgradeMessage.upgrade(inputChannel, outputChannel, up.engineContext, up.userContext)
val job = up.upgradeMessage.upgrade(
inputChannel, outputChannel,
up.engineContext + upgradeJob, up.userContext + upgradeJob
)
upgradeJob.complete()
job.invokeOnCompletion {
inputChannel.cancel()
outputChannel.close()
upgradeJob.cancel()
}
}
}
override fun destroy() {
upgradeJob.completeExceptionally(CancellationException("Upgraded WebConnection destroyed"))
try {
upgradeJob.completeExceptionally(CancellationException("Upgraded WebConnection destroyed"))
} catch (_: Throwable) {
}
}
}