mirror of
https://github.com/jlengrand/ktor.git
synced 2026-03-10 08:31:20 +00:00
Code cleanup: extract few code blocks to separate functions + fix comments.
This commit is contained in:
@@ -125,22 +125,33 @@ private suspend fun WebSocketServerSession.proceedWebSocket(handler: suspend Def
|
||||
webSockets.pingInterval?.toMillis() ?: -1L,
|
||||
webSockets.timeout.toMillis()
|
||||
)
|
||||
session.run {
|
||||
try {
|
||||
toServerSession(call).handler()
|
||||
session.close()
|
||||
} catch (cancelled: CancellationException) {
|
||||
throw cancelled
|
||||
} catch (io: ChannelIOException) {
|
||||
// don't log I/O exceptions
|
||||
throw io
|
||||
} catch (cause: Throwable) {
|
||||
application.log.error("Websocket handler failed", cause)
|
||||
throw cause
|
||||
}
|
||||
}
|
||||
session.handleServerSession(call, handler)
|
||||
|
||||
session.coroutineContext[Job]!!.join()
|
||||
session.joinSession()
|
||||
}
|
||||
|
||||
private suspend fun CoroutineScope.joinSession() {
|
||||
coroutineContext[Job]!!.join()
|
||||
}
|
||||
|
||||
@UseExperimental(WebSocketInternalAPI::class)
|
||||
private suspend fun DefaultWebSocketSessionImpl.handleServerSession(
|
||||
call: ApplicationCall,
|
||||
handler: suspend DefaultWebSocketServerSession.() -> Unit
|
||||
) {
|
||||
try {
|
||||
val serverSession = toServerSession(call)
|
||||
handler(serverSession)
|
||||
close()
|
||||
} catch (cancelled: CancellationException) {
|
||||
throw cancelled
|
||||
} catch (io: ChannelIOException) {
|
||||
// don't log I/O exceptions
|
||||
throw io
|
||||
} catch (cause: Throwable) {
|
||||
call.application.log.error("Websocket handler failed", cause)
|
||||
throw cause
|
||||
}
|
||||
}
|
||||
|
||||
private class WebSocketProtocolsSelector(
|
||||
|
||||
@@ -64,7 +64,7 @@ fun CoroutineScope.pinger(
|
||||
try {
|
||||
while (!isClosedForReceive) {
|
||||
// drop pongs during period delay as they are irrelevant
|
||||
// here timeout is expected so ignore it
|
||||
// here we expect a timeout, so ignore it
|
||||
withTimeoutOrNull(periodMillis) {
|
||||
while (true) {
|
||||
receive() // timeout causes loop to break on receive
|
||||
@@ -86,7 +86,7 @@ fun CoroutineScope.pinger(
|
||||
|
||||
if (rc == null) {
|
||||
// timeout
|
||||
// we were unable to send ping or hadn't get valid pong message in time
|
||||
// we were unable to send the ping or hadn't got a valid pong message in time,
|
||||
// so we are triggering close sequence (if already started then the following close frame could be ignored)
|
||||
|
||||
val closeFrame = Frame.Close(CloseReason(CloseReason.Codes.INTERNAL_ERROR, "Ping timeout"))
|
||||
@@ -116,12 +116,15 @@ private suspend fun SendChannel<Frame.Ping>.sendPing(
|
||||
) = with(buffer) {
|
||||
clear()
|
||||
encoder.reset()
|
||||
|
||||
encoder.encode(CharBuffer.wrap(content), this, true).apply {
|
||||
if (this.isError) throwException()
|
||||
else if (this.isOverflow) throwException()
|
||||
}
|
||||
encoder.encodeOrFail(this, content)
|
||||
flip()
|
||||
|
||||
send(Frame.Ping(this))
|
||||
}
|
||||
|
||||
private fun CharsetEncoder.encodeOrFail(buffer: ByteBuffer, content: String) {
|
||||
encode(CharBuffer.wrap(content), buffer, true).apply {
|
||||
if (isError) throwException()
|
||||
else if (isOverflow) throwException()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user