resolve connection problems between all versions of daemon

This commit is contained in:
Vadim Brilyantov
2018-03-19 21:05:13 +03:00
parent 2da6ec4a36
commit f273deb18c
11 changed files with 247 additions and 105 deletions

View File

@@ -38,7 +38,8 @@ suspend fun walkDaemonsAsync(
fileToCompareTimestamp: File,
filter: (File, Int) -> Boolean = { _, _ -> true },
report: (DaemonReportCategory, String) -> Unit = { _, _ -> },
useRMI: Boolean = true
useRMI: Boolean = true,
useSockets: Boolean = true
): List<DaemonWithMetadataAsync> = runBlocking(Unconfined) {
// : Sequence<DaemonWithMetadataAsync>
val classPathDigest = compilerId.compilerClasspath.map { File(it).absolutePath }.distinctStringsDigest().toHexString()
@@ -53,27 +54,27 @@ suspend fun walkDaemonsAsync(
assert(port!! in 1..(MAX_PORT_NUMBER - 1))
val relativeAge = fileToCompareTimestamp.lastModified() - file.lastModified()
report(
org.jetbrains.kotlin.daemon.common.DaemonReportCategory.DEBUG,
DaemonReportCategory.DEBUG,
"found daemon on socketPort $port ($relativeAge ms old), trying to connect"
)
log.info("found daemon on socketPort $port ($relativeAge ms old), trying to connect")
val daemon = tryConnectToDaemonAsync(port, report, useRMI)
val daemon = tryConnectToDaemonAsync(port, report, useRMI, useSockets)
log.info("daemon = $daemon (port= $port)")
// cleaning orphaned file; note: daemon should shut itself down if it detects that the runServer file is deleted
if (daemon == null) {
if (relativeAge - ORPHANED_RUN_FILE_AGE_THRESHOLD_MS <= 0) {
report(
org.jetbrains.kotlin.daemon.common.DaemonReportCategory.DEBUG,
DaemonReportCategory.DEBUG,
"found fresh runServer file '${file.absolutePath}' ($relativeAge ms old), but no daemon, ignoring it"
)
} else {
report(
org.jetbrains.kotlin.daemon.common.DaemonReportCategory.DEBUG,
DaemonReportCategory.DEBUG,
"found seemingly orphaned runServer file '${file.absolutePath}' ($relativeAge ms old), deleting it"
)
if (!file.delete()) {
report(
org.jetbrains.kotlin.daemon.common.DaemonReportCategory.INFO,
DaemonReportCategory.INFO,
"WARNING: unable to delete seemingly orphaned file '${file.absolutePath}', cleanup recommended"
)
}
@@ -111,7 +112,7 @@ private inline fun tryConnectToDaemonByRMI(port: Int, report: (DaemonReportCateg
)?.lookup(COMPILER_SERVICE_RMI_NAME)
when (daemon) {
null -> report(DaemonReportCategory.INFO, "daemon not found")
is CompileService -> return daemon.toClient()
is CompileService -> return daemon.toClient(port)
else -> report(DaemonReportCategory.INFO, "Unable to cast compiler service, actual class received: ${daemon::class.java.name}")
}
} catch (e: Throwable) {
@@ -141,9 +142,10 @@ private inline fun tryConnectToDaemonBySockets(port: Int, report: (DaemonReportC
private fun tryConnectToDaemonAsync(
port: Int,
report: (DaemonReportCategory, String) -> Unit,
useRMI: Boolean = true
useRMI: Boolean = true,
useSockets: Boolean = true
): CompileServiceClientSide? =
tryConnectToDaemonBySockets(port, report)
useSockets.takeIf { it }?.let { tryConnectToDaemonBySockets(port, report) }
?: (useRMI.takeIf { it }?.let { tryConnectToDaemonByRMI(port, report) })
private const val validFlagFileKeywordChars = "abcdefghijklmnopqrstuvwxyz0123456789-_"

View File

@@ -13,7 +13,7 @@ import org.jetbrains.kotlin.daemon.common.experimental.socketInfrastructure.Clie
import org.jetbrains.kotlin.daemon.common.experimental.socketInfrastructure.DefaultClientRMIWrapper
import java.io.File
class CompileServiceAsyncWrapper(val rmiCompileService: CompileService) : CompileServiceClientSide, Client by DefaultClientRMIWrapper() {
class CompileServiceAsyncWrapper(val rmiCompileService: CompileService, override val serverPort: Int) : CompileServiceClientSide, Client by DefaultClientRMIWrapper() {
override suspend fun compile(
sessionId: Int,
@@ -109,4 +109,4 @@ class CompileServiceAsyncWrapper(val rmiCompileService: CompileService) : Compil
}
fun CompileService.toClient() = CompileServiceAsyncWrapper(this)
fun CompileService.toClient(serverPort: Int) = CompileServiceAsyncWrapper(this, serverPort)

View File

@@ -19,10 +19,13 @@ import org.jetbrains.kotlin.daemon.common.experimental.socketInfrastructure.Serv
import java.io.File
import java.util.logging.Logger
interface CompileServiceClientSide : CompileServiceAsync, Client
interface CompileServiceClientSide: CompileServiceAsync, Client {
val serverPort: Int
}
class CompileServiceClientSideImpl(
val serverPort: Int,
override val serverPort: Int,
val serverHost: String
) : CompileServiceClientSide, Client by DefaultClient(serverPort, serverHost) {

View File

@@ -36,7 +36,10 @@ class CompileServiceRMIWrapper(val server: CompileServiceServerSide, daemonOptio
}
override fun getDaemonJVMOptions() = runBlocking(Unconfined) {
server.getDaemonJVMOptions()
log.info("in wrapper's getDaemonJVMOptions")
server.getDaemonJVMOptions().also {
log.info("server returned ${if (it.isGood) it.get() else it}")
}
}
override fun registerClient(aliveFlagPath: String?) = runBlocking(Unconfined) {
@@ -170,11 +173,14 @@ class CompileServiceRMIWrapper(val server: CompileServiceServerSide, daemonOptio
init {
// assuming logically synchronized
println("_______________________________ [RMI WRAPPER] <init> _________________________________")
try {
// cleanup for the case of incorrect restart and many other situations
UnicastRemoteObject.unexportObject(this, false)
println("_______________________________ [RMI WRAPPER] unexportObject_________________________________")
} catch (e: NoSuchObjectException) {
// ignoring if object already exported
println("_______________________________ [RMI WRAPPER] // ignoring if object already exported_________________________________")
}
val (registry, port) = findPortAndCreateRegistry(
@@ -183,6 +189,8 @@ class CompileServiceRMIWrapper(val server: CompileServiceServerSide, daemonOptio
RMI_WRAPPER_PORTS_RANGE_END
)
println("_______________________________ [RMI WRAPPER] port = $port , registry = $registry _________________________________")
val stub = UnicastRemoteObject.exportObject(
this,
port,
@@ -190,7 +198,10 @@ class CompileServiceRMIWrapper(val server: CompileServiceServerSide, daemonOptio
LoopbackNetworkInterface.serverLoopbackSocketFactory
) as CompileService
println("_______________________________ [RMI WRAPPER] stub = $stub _________________________________")
registry.rebind(COMPILER_SERVICE_RMI_NAME, stub)
println("_______________________________ [RMI WRAPPER] rebinded! _________________________________")
// create file :
val runFileDir = File(daemonOptions.runFilesPathOrDefault)

View File

@@ -11,6 +11,8 @@ import java.util.ArrayList
import java.util.function.Function
import java.util.logging.Logger
val BYTES_TOKEN = byteArrayOf(1, 2, 3, 4)
interface Client : Serializable, AutoCloseable {
@Throws(Exception::class)
fun connectToServer()
@@ -64,6 +66,7 @@ class DefaultClient(
log.info("OK serv.openIO() |port=$serverPort|")
input = it.input
output = it.output
output.printBytes(BYTES_TOKEN)
}
}
}

View File

@@ -1,29 +0,0 @@
/*
* Copyright 2000-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license
* that can be found in the license/LICENSE.txt file.
*/
package org.jetbrains.kotlin.daemon.common.experimental.socketInfrastructure
import java.io.File
import java.io.PrintWriter
import java.util.*
import kotlin.concurrent.schedule
object Report {
private val _log_file: PrintWriter by lazy {
/*val f = */File("_LOG_.txt").printWriter()
// Timer().schedule(10000) {
// _log_file.close()
// }
// f
}
public fun log(debugString: String, classs: String) {
"[$classs] : $debugString".let {
// _log_file.println(it)
println(it)
}
}
}

View File

@@ -29,13 +29,27 @@ interface Server<out T : ServerBase> : ServerBase {
suspend fun processMessage(msg: AnyMessage<in T>, output: ByteWriteChannelWrapper): State = when (msg) {
is Server.Message<in T> -> Server.State.WORKING.also { msg.process(this as T, output) }
is Server.EndConnectionMessage<in T> -> Server.State.CLOSED
is Server.EndConnectionMessage<in T> -> {
println("!EndConnectionMessage!")
Server.State.CLOSED
}
is Server.ServerDownMessage<in T> -> Server.State.DOWNING
else -> Server.State.ERROR
}
suspend fun attachClient(client: Socket): Deferred<State> = async {
suspend fun attachClient(client: Socket): Deferred<State> = async {
val (input, output) = client.openIO(log)
try {
val bytes = input.readBytes(BYTES_TOKEN.size)
log.info("bytes : ${bytes.toList()}")
if (bytes.zip(BYTES_TOKEN).any { it.first != it.second }) {
log.info("BAD TOKEN")
return@async Server.State.CLOSED
}
} catch (e: Throwable) {
log.info("NO TOKEN")
return@async Server.State.CLOSED
}
var finalState = Server.State.WORKING
loop@
while (true) {
@@ -72,8 +86,11 @@ interface Server<out T : ServerBase> : ServerBase {
log.info("client accepted! (${client.remoteAddress})")
attachClient(client).invokeOnCompletion {
when (it) {
Server.State.DOWNING -> TODO("DOWN")
Server.State.DOWNING -> {
client.close()
}
else -> {
client.close()
}
}
}

View File

@@ -17,7 +17,7 @@ import java.util.logging.Logger
class ByteReadChannelWrapper(private val readChannel: ByteReadChannel, private val log: Logger) {
private suspend fun readBytes(length: Int) =
suspend fun readBytes(length: Int) =
readChannel.readPacket(length).readBytes()
private suspend fun getObject(length: Int) =
@@ -53,7 +53,7 @@ class ByteReadChannelWrapper(private val readChannel: ByteReadChannel, private v
class ByteWriteChannelWrapper(private val writeChannel: ByteWriteChannel, private val log: Logger) {
private suspend fun printBytes(bytes: ByteArray) {
suspend fun printBytes(bytes: ByteArray) {
bytes.forEach { writeChannel.writeByte(it) }
writeChannel.flush()
}

View File

@@ -9,7 +9,6 @@ import com.intellij.openapi.Disposable
import com.intellij.openapi.util.Disposer
import com.intellij.openapi.vfs.impl.ZipHandler
import com.intellij.openapi.vfs.impl.jar.CoreJarFileSystem
import io.ktor.network.sockets.Socket
import kotlinx.coroutines.experimental.Unconfined
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.runBlocking
@@ -35,9 +34,6 @@ import org.jetbrains.kotlin.daemon.KotlinJvmReplService
import org.jetbrains.kotlin.daemon.LazyClasspathWatcher
import org.jetbrains.kotlin.daemon.common.*
import org.jetbrains.kotlin.daemon.common.experimental.*
import org.jetbrains.kotlin.daemon.common.experimental.socketInfrastructure.ByteWriteChannelWrapper
import org.jetbrains.kotlin.daemon.common.experimental.socketInfrastructure.Report
import org.jetbrains.kotlin.daemon.common.experimental.socketInfrastructure.Server
import org.jetbrains.kotlin.daemon.incremental.experimental.RemoteAnnotationsFileUpdaterAsync
import org.jetbrains.kotlin.daemon.incremental.experimental.RemoteArtifactChangesProviderAsync
@@ -129,6 +125,7 @@ class CompileServiceServerSideImpl(
System.setProperty(KOTLIN_COMPILER_ENVIRONMENT_KEEPALIVE_PROPERTY, "true")
// TODO UNCOMMENT THIS : this.toRMIServer(daemonOptions, compilerId) // also create RMI server in order to support old clients
this.toRMIServer(daemonOptions, compilerId)
timer.schedule(10) {
exceptionLoggingTimerThread { initiateElections() }
@@ -711,12 +708,15 @@ class CompileServiceServerSideImpl(
compilerId,
runFile,
filter = { _, p -> p != port },
report = { _, msg -> log.info(msg) }, useRMI = false
report = { _, msg -> log.info(msg) },
useRMI = false
)
val comparator = compareByDescending<DaemonWithMetadataAsync, DaemonJVMOptions>(
DaemonJVMOptionsMemoryComparator(),
{ it.jvmOptions }
).thenBy(FileAgeComparator()) { it.runFile }
)
.thenBy(FileAgeComparator()) { it.runFile }
.thenBy { it.daemon.serverPort }
aliveWithOpts.maxWith(comparator)?.let { bestDaemonWithMetadata ->
val fattestOpts = bestDaemonWithMetadata.jvmOptions
if (fattestOpts memorywiseFitsInto daemonJVMOptions && FileAgeComparator().compare(

View File

@@ -55,7 +55,10 @@ class CompilerApiTest : KotlinIntegrationTestBase() {
}
private val externalLogFile: File by lazy { createNewLogFile() }
private val log by lazy { Logger.getLogger("test") }
private val log by lazy {
currentLogFile
Logger.getLogger("test")
}
val compilerClassPath = listOf(
File(compilerLibDir, "kotlin-compiler.jar")
@@ -193,8 +196,8 @@ class CompilerApiTest : KotlinIntegrationTestBase() {
log.info("in runBlocking")
KotlinCompilerClient.shutdownCompileService(compilerId, daemonOptions)
}
// currentLogFile.delete()
// externalLogFile.delete()
currentLogFile.delete()
externalLogFile.delete()
}
fun testHelloApp() {
@@ -270,9 +273,6 @@ class CompilerApiTest : KotlinIntegrationTestBase() {
verbose = true,
reportPerf = true
)
val currentLogFile = createTempFile("kotlin-daemon-test.", ".log")
val daemonJVMOptions = configureDaemonJVMOptions(
"D$COMPILE_DAEMON_LOG_PATH_PROPERTY=\"${externalLogFile.loggerCompatiblePath}\"",
inheritMemoryLimits = false, inheritOtherJvmOptions = false, inheritAdditionalProperties = false

View File

@@ -5,82 +5,217 @@
package org.jetbrains.kotlin.daemon.experimental.unit
import kotlinx.coroutines.experimental.Deferred
import kotlinx.coroutines.experimental.Unconfined
import kotlinx.coroutines.experimental.runBlocking
import org.jetbrains.kotlin.cli.common.CLICompiler
import org.jetbrains.kotlin.cli.js.K2JSCompiler
import org.jetbrains.kotlin.cli.jvm.K2JVMCompiler
import org.jetbrains.kotlin.cli.metadata.K2MetadataCompiler
import org.jetbrains.kotlin.daemon.CompileServiceImpl
import org.jetbrains.kotlin.daemon.CompilerSelector
import org.jetbrains.kotlin.daemon.common.*
import org.jetbrains.kotlin.daemon.common.experimental.walkDaemonsAsync
import org.jetbrains.kotlin.daemon.common.experimental.DaemonWithMetadataAsync
import org.jetbrains.kotlin.daemon.common.experimental.findPortForSocket
import org.jetbrains.kotlin.daemon.common.experimental.walkDaemonsAsync
import org.jetbrains.kotlin.daemon.experimental.CompileServiceServerSideImpl
import org.jetbrains.kotlin.daemon.experimental.KotlinCompileDaemon
import org.jetbrains.kotlin.integration.KotlinIntegrationTestBase
import java.awt.SystemColor.info
import java.io.File
import java.util.*
import java.util.logging.Logger
import kotlin.concurrent.schedule
class ConnectionsTest : KotlinIntegrationTestBase() {
fun testConnectionMechanism() {
val daemonJVMOptions = configureDaemonJVMOptions(
init {
// val logFile = createTempFile("/Users/jetbrains/Documents/kotlin/my_fork/kotlin", ".txt")
// println("log file path : ${logFile.loggerCompatiblePath}")
// val cfg: String =
// "handlers = java.util.logging.FileHandler\n" +
// "java.util.logging.FileHandler.level = ALL\n" +
// "java.util.logging.FileHandler.formatter = java.util.logging.SimpleFormatter\n" +
// "java.util.logging.FileHandler.encoding = UTF-8\n" +
// "java.util.logging.FileHandler.limit = 0\n" + // if file is provided - disabled, else - 1Mb
// "java.util.logging.FileHandler.count = 1\n" +
// "java.util.logging.FileHandler.append = true\n" +
// "java.util.logging.FileHandler.pattern = ${logFile.loggerCompatiblePath}\n" +
// "java.util.logging.SimpleFormatter.format = %1\$tF %1\$tT.%1\$tL [%3\$s] %4\$s: %5\$s%n\n"
// LogManager.getLogManager().readConfiguration(cfg.byteInputStream())
}
private val log by lazy { Logger.getLogger("ConnectionsTest") }
private val daemonJVMOptions
get() = configureDaemonJVMOptions(
inheritMemoryLimits = true,
inheritOtherJvmOptions = true,
inheritAdditionalProperties = true
)
val compilerId = CompilerId()
val daemonOptions = DaemonOptions()
val port = findPortForSocket(
private val compilerId get() = CompilerId()
private val daemonOptions get() = DaemonOptions()
private val port by lazy {
findPortForSocket(
COMPILE_DAEMON_FIND_PORT_ATTEMPTS,
COMPILE_DAEMON_PORTS_RANGE_START,
COMPILE_DAEMON_PORTS_RANGE_END
)
// timer with a daemon thread, meaning it should not prevent JVM to exit normally
val timer = Timer(true)
val compilerService = CompileServiceServerSideImpl(
}
private val timer = Timer(true)
private val runFile: File
get() {
val runFileDir = File(daemonOptions.runFilesPathOrDefault)
runFileDir.mkdirs()
return File(
runFileDir,
makeRunFilenameString(
timestamp = "%tFT%<tH-%<tM-%<tS.%<tLZ".format(Calendar.getInstance(TimeZone.getTimeZone("Z"))),
digest = compilerId.compilerClasspath.map { File(it).absolutePath }.distinctStringsDigest().toHexString(),
port = port.toString()
)
)
}
private val onShutdown: () -> Unit = {
if (daemonOptions.forceShutdownTimeoutMilliseconds != COMPILE_DAEMON_TIMEOUT_INFINITE_MS) {
// running a watcher thread that ensures that if the daemon is not exited normally (may be due to RMI leftovers), it's forced to exit
timer.schedule(daemonOptions.forceShutdownTimeoutMilliseconds) {
cancel()
org.jetbrains.kotlin.daemon.KotlinCompileDaemon.log.info("force JVM shutdown")
System.exit(0)
}
} else {
timer.cancel()
}
}
private fun getNewDaemonsOrAsyncWrappers() = runBlocking(Unconfined) {
walkDaemonsAsync(
File(daemonOptions.runFilesPathOrDefault),
compilerId,
runFile,
filter = { _, _ -> true },
report = { _, msg -> log.info(msg) },
useRMI = true,
useSockets = true
).toList()
}
private fun getOldDaemonsOrRMIWrappers() = runBlocking(Unconfined) {
walkDaemons(
File(daemonOptions.runFilesPathOrDefault),
compilerId,
runFile,
filter = { _, _ -> true },
report = { _, msg -> log.info(msg) }
).toList()
}
private fun runNewServer(): Deferred<Unit> =
CompileServiceServerSideImpl(
port,
compilerId,
daemonOptions,
daemonJVMOptions,
port,
timer,
{
if (daemonOptions.forceShutdownTimeoutMilliseconds != COMPILE_DAEMON_TIMEOUT_INFINITE_MS) {
// running a watcher thread that ensures that if the daemon is not exited normally (may be due to RMI leftovers), it's forced to exit
timer.schedule(daemonOptions.forceShutdownTimeoutMilliseconds) {
cancel()
KotlinCompileDaemon.log.info("force JVM shutdown")
System.exit(0)
}
} else {
timer.cancel()
}
})
compilerService.runServer()
println("service started")
val runFileDir = File(daemonOptions.runFilesPathOrDefault)
runFileDir.mkdirs()
val runFile = File(
runFileDir,
makeRunFilenameString(
timestamp = "%tFT%<tH-%<tM-%<tS.%<tLZ".format(Calendar.getInstance(TimeZone.getTimeZone("Z"))),
digest = compilerId.compilerClasspath.map { File(it).absolutePath }.distinctStringsDigest().toHexString(),
port = port.toString()
)
)
val daemons = runBlocking(Unconfined) {
walkDaemonsAsync(
File(daemonOptions.runFilesPathOrDefault),
compilerId,
runFile,
filter = { _, _ -> true },
report = { _, msg -> println("[report] : " + msg) }
).toList()
onShutdown
).let {
log.info("service created")
it.runServer()
}
println("daemons : $daemons")
private fun runOldServer() {
val (registry, port) = findPortAndCreateRegistry(
COMPILE_DAEMON_FIND_PORT_ATTEMPTS,
COMPILE_DAEMON_PORTS_RANGE_START,
COMPILE_DAEMON_PORTS_RANGE_END
)
val compilerSelector = object : CompilerSelector {
private val jvm by lazy { K2JVMCompiler() }
private val js by lazy { K2JSCompiler() }
private val metadata by lazy { K2MetadataCompiler() }
override fun get(targetPlatform: CompileService.TargetPlatform): CLICompiler<*> = when (targetPlatform) {
CompileService.TargetPlatform.JVM -> jvm
CompileService.TargetPlatform.JS -> js
CompileService.TargetPlatform.METADATA -> metadata
}
}
CompileServiceImpl(
registry = registry,
compiler = compilerSelector,
compilerId = compilerId,
daemonOptions = daemonOptions,
daemonJVMOptions = daemonJVMOptions,
port = port,
timer = timer,
onShutdown = onShutdown
)
}
val comparator = compareByDescending<DaemonWithMetadataAsync, DaemonJVMOptions>(
DaemonJVMOptionsMemoryComparator(),
{ it.jvmOptions }
)
.thenBy(FileAgeComparator()) { it.runFile }
.thenBy { it.daemon.serverPort }
fun testConnectionMEchanism_OldClient_OldServer() {
runOldServer()
val daemons = getOldDaemonsOrRMIWrappers()
log.info("daemons : $daemons")
assert(daemons.isNotEmpty())
val daemon = daemons[0].daemon
println("chosen : $daemon")
val info = runBlocking(Unconfined) { daemon.getDaemonInfo() }
println("info : $info")
log.info("info : $info")
assert(info.isGood)
println("test passed")
}
fun testConnectionMechanism_NewClient_NewServer() {
val runService = runNewServer()
val daemons = getNewDaemonsOrAsyncWrappers()
log.info("daemons : $daemons")
assert(daemons.isNotEmpty())
val daemon = daemons.maxWith(comparator)!!.daemon
println("chosen : $daemon")
val info = runBlocking(Unconfined) { daemon.getDaemonInfo() }
log.info("info : $info")
assert(info.isGood)
println("test passed")
}
fun testConnectionMechanism_OldClient_NewServer() {
val runService = runNewServer()
val daemons = getOldDaemonsOrRMIWrappers()
log.info("daemons : $daemons")
assert(daemons.isNotEmpty())
val daemon = daemons[0].daemon
println("chosen : $daemon")
val info = runBlocking(Unconfined) { daemon.getDaemonInfo() }
log.info("info : $info")
assert(info.isGood)
println("test passed")
}
fun testConnectionMechanism_NewClient_OldServer() {
runOldServer()
val daemons = getNewDaemonsOrAsyncWrappers()
log.info("daemons : $daemons")
assert(daemons.isNotEmpty())
val daemon = daemons.maxWith(comparator)!!.daemon
println("chosen : $daemon")
val info = runBlocking(Unconfined) { daemon.getDaemonInfo() }
log.info("info : $info")
assert(info.isGood)
println("test passed")
}
}