From da0afc19d267cee00dbd666fe5eb56eb5785f0bf Mon Sep 17 00:00:00 2001 From: Brett Morien Date: Thu, 17 Sep 2020 09:55:18 -0700 Subject: [PATCH 1/4] Fix for cycling error messages when source disappears. --- .../orbit/server/mesh/ClusterManager.kt | 10 +++ .../orbit/server/pipeline/step/RetryStep.kt | 64 ----------------- .../orbit/server/pipeline/step/RoutingStep.kt | 14 ++-- .../test/kotlin/orbit/server/PipelineTests.kt | 72 ++++++++++++------- 4 files changed, 67 insertions(+), 93 deletions(-) delete mode 100644 src/orbit-server/src/main/kotlin/orbit/server/pipeline/step/RetryStep.kt diff --git a/src/orbit-server/src/main/kotlin/orbit/server/mesh/ClusterManager.kt b/src/orbit-server/src/main/kotlin/orbit/server/mesh/ClusterManager.kt index 68d417b61..96adf5a42 100644 --- a/src/orbit-server/src/main/kotlin/orbit/server/mesh/ClusterManager.kt +++ b/src/orbit-server/src/main/kotlin/orbit/server/mesh/ClusterManager.kt @@ -6,6 +6,7 @@ package orbit.server.mesh +import mu.KotlinLogging import orbit.server.OrbitServerConfig import orbit.shared.exception.InvalidChallengeException import orbit.shared.exception.InvalidNodeId @@ -31,6 +32,7 @@ class ClusterManager( private val clock: Clock, private val nodeDirectory: NodeDirectory ) { + private val logger = KotlinLogging.logger { } private val leaseExpiration = config.nodeLeaseDuration private val clusterNodes = ConcurrentHashMap() private val nodeGraph = AtomicReference>() @@ -124,6 +126,14 @@ class ClusterManager( fun findRoute(sourceNode: NodeId, targetNode: NodeId): List { val graph = nodeGraph.get() ?: buildGraph() + if (!graph.containsVertex(sourceNode)) { + logger.info { "Source node ${sourceNode} not found in cluster." } + return emptyList() + } + if (!graph.containsVertex(targetNode)) { + logger.info { "Target node ${targetNode} not found in cluster." } + return emptyList() + } val path = DijkstraShortestPath.findPathBetween(graph, sourceNode, targetNode) return path?.vertexList?.drop(1) ?: emptyList() } diff --git a/src/orbit-server/src/main/kotlin/orbit/server/pipeline/step/RetryStep.kt b/src/orbit-server/src/main/kotlin/orbit/server/pipeline/step/RetryStep.kt deleted file mode 100644 index 944320e74..000000000 --- a/src/orbit-server/src/main/kotlin/orbit/server/pipeline/step/RetryStep.kt +++ /dev/null @@ -1,64 +0,0 @@ -/* - Copyright (C) 2015 - 2020 Electronic Arts Inc. All rights reserved. - This file is part of the Orbit Project . - See license in LICENSE. - */ - -package orbit.server.pipeline.step - -import io.micrometer.core.instrument.Counter -import io.micrometer.core.instrument.Metrics -import orbit.server.OrbitServerConfig -import orbit.server.mesh.ClusterManager -import orbit.server.pipeline.PipelineContext -import orbit.server.service.Meters -import orbit.shared.net.Message -import orbit.shared.net.MessageContent -import orbit.shared.net.MessageTarget - -class RetryStep( - private val clusterManager: ClusterManager, - private val config: OrbitServerConfig -) : PipelineStep { - private val retryAttempts: Counter - private val retryErrors: Counter - - init { - retryAttempts = Metrics.counter(Meters.Names.RetryAttempts) - retryErrors = Metrics.counter(Meters.Names.RetryErrors) - } - - override suspend fun onOutbound(context: PipelineContext, msg: Message) { - val target = when (val target = msg.target) { - is MessageTarget.Unicast -> { - target.targetNode - } - is MessageTarget.RoutedUnicast -> { - target.route.nextNode - } - else -> null - } - - if (target != null) { - if (clusterManager.getNode(target) == null) { - context.pushNew( - if (msg.attempts < config.messageRetryAttempts) { - retryAttempts.increment() - msg.copy( - attempts = msg.attempts + 1 - ) - } else { - retryErrors.increment() - msg.copy( - content = MessageContent.Error("Failed to deliver message after ${msg.attempts} attempts"), - target = MessageTarget.Unicast(msg.source!!) - ) - } - ) - return - } - } - - context.next(msg) - } -} \ No newline at end of file diff --git a/src/orbit-server/src/main/kotlin/orbit/server/pipeline/step/RoutingStep.kt b/src/orbit-server/src/main/kotlin/orbit/server/pipeline/step/RoutingStep.kt index b81aab126..321aca04c 100644 --- a/src/orbit-server/src/main/kotlin/orbit/server/pipeline/step/RoutingStep.kt +++ b/src/orbit-server/src/main/kotlin/orbit/server/pipeline/step/RoutingStep.kt @@ -65,19 +65,25 @@ class RoutingStep( } fun retry(context: PipelineContext, msg: Message) { - context.pushNew( + val nextMsg = if (msg.attempts < config.messageRetryAttempts) { retryAttempts.increment() msg.copy( attempts = msg.attempts + 1 ) - } else { + } else if (msg.content !is MessageContent.Error) { retryErrors.increment() msg.copy( content = MessageContent.Error("Failed to deliver message after ${msg.attempts} attempts"), - target = MessageTarget.Unicast(msg.source!!) + target = MessageTarget.Unicast(msg.source!!), + attempts = 0 ) + } else { + null } - ) + + if (nextMsg != null) { + context.pushNew(nextMsg) + } } } \ No newline at end of file diff --git a/src/orbit-server/src/test/kotlin/orbit/server/PipelineTests.kt b/src/orbit-server/src/test/kotlin/orbit/server/PipelineTests.kt index e6c8f7e49..3c20ecb65 100644 --- a/src/orbit-server/src/test/kotlin/orbit/server/PipelineTests.kt +++ b/src/orbit-server/src/test/kotlin/orbit/server/PipelineTests.kt @@ -6,6 +6,7 @@ package orbit.server +//import orbit.server.pipeline.step.RetryStep import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.anyOrNull import com.nhaarman.mockitokotlin2.argThat @@ -14,17 +15,14 @@ import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.never import com.nhaarman.mockitokotlin2.verify import kotlinx.coroutines.runBlocking -import orbit.server.mesh.ClusterManager import orbit.server.pipeline.PipelineContext -import orbit.server.pipeline.step.RetryStep -import orbit.shared.mesh.NodeCapabilities +import orbit.server.pipeline.step.RoutingStep +import orbit.server.router.Router import orbit.shared.mesh.NodeId -import orbit.shared.mesh.NodeInfo -import orbit.shared.mesh.NodeLease -import orbit.shared.mesh.NodeStatus import orbit.shared.net.Message import orbit.shared.net.MessageContent import orbit.shared.net.MessageTarget +import orbit.shared.router.Route import org.junit.Test class PipelineTests : BaseServerTest() { @@ -38,13 +36,14 @@ class PipelineTests : BaseServerTest() { target = MessageTarget.Unicast(testNode) ) - val clusterManager = mock() { - onBlocking { getNode(testNode) } doReturn null + val router = mock { + onBlocking { findRoute(testNode, null) } doReturn (Route(emptyList())) } - val retryStep = RetryStep(clusterManager, OrbitServerConfig()) + + val routingStep = RoutingStep(router, OrbitServerConfig()) val context = mock() - retryStep.onOutbound(context, message) + routingStep.onOutbound(context, message) verify(context, never()).next(any()) verify(context).pushNew(message.copy(attempts = 1)) @@ -55,26 +54,22 @@ class PipelineTests : BaseServerTest() { fun `When a node is present in cluster, message is sent to next step`() { runBlocking { val testNode = NodeId("test", "test") + val route = Route(listOf(testNode)) val message = Message( mock(), target = MessageTarget.Unicast(testNode) ) - val clusterManager = mock { - onBlocking { getNode(testNode) } doReturn - NodeInfo( - testNode, - NodeCapabilities(listOf("test")), - lease = NodeLease.forever, - nodeStatus = NodeStatus.ACTIVE - ) + val router = mock { + onBlocking { findRoute(testNode, null) } doReturn route } - val retryStep = RetryStep(clusterManager, OrbitServerConfig()) + + val routingStep = RoutingStep(router, OrbitServerConfig()) val context = mock() - retryStep.onOutbound(context, message) + routingStep.onOutbound(context, message) - verify(context).next(message) + verify(context).next(message.copy(target = MessageTarget.RoutedUnicast(route))) verify(context, never()).pushNew(any(), anyOrNull()) } } @@ -91,16 +86,43 @@ class PipelineTests : BaseServerTest() { attempts = 11 ) - val clusterManager = mock { - onBlocking { getNode(testNode) } doReturn null + val router = mock { + onBlocking { findRoute(testNode, null) } doReturn (Route(emptyList())) } - val retryStep = RetryStep(clusterManager, OrbitServerConfig(messageRetryAttempts = 10)) + + val routingStep = RoutingStep(router, OrbitServerConfig()) val context = mock() - retryStep.onOutbound(context, message) + routingStep.onOutbound(context, message) verify(context, never()).next(any()) verify(context).pushNew(argThat { content is MessageContent.Error }, anyOrNull()) } } + + @Test + fun `When an error message exceeds retry attempts, the message is discarded`() { + runBlocking { + val testNode = NodeId("test", "test") + val sourceNode = NodeId("source", "test") + val message = Message( + mock(), + target = MessageTarget.Unicast(testNode), + source = sourceNode, + attempts = 11 + ) + + val router = mock { + onBlocking { findRoute(testNode, null) } doReturn (Route(emptyList())) + } + + val routingStep = RoutingStep(router, OrbitServerConfig()) + + val context = mock() + routingStep.onOutbound(context, message) + + verify(context, never()).next(any()) + verify(context, never()).pushNew(any(), anyOrNull()) + } + } } From df38edd192df52767e5759d569ab26efa4bd2e85 Mon Sep 17 00:00:00 2001 From: Brett Morien Date: Thu, 17 Sep 2020 09:56:03 -0700 Subject: [PATCH 2/4] Fix docker settings and include prometheus meter endpoint. --- docker/docker-compose.yml | 5 +++-- docker/local-compose-orbit-settings.json | 7 +++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 0531db3e7..ff96ecdcb 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -6,14 +6,15 @@ services: dockerfile: docker/server/Dockerfile ports: - "5006:5005" + - "8080:8080" expose: - 50056 environment: ORBIT_SETTINGS: /etc/orbit/orbit.json ORBIT_URL: "orbit-server-1:50056" ORBIT_PORT: 50056 - ETCD_ADDRESSABLE_DIR: http://node-directory:2379 - ETCD_NODE_DIR: http://addressable-directory:2379 + ETCD_ADDRESSABLE_DIR: http://addressable-directory:2379 + ETCD_NODE_DIR: http://node-directory:2379 entrypoint: sh ./opt/orbit/entrypoint.sh volumes: - ../src/orbit-application/build/libs:/opt/orbit/libs diff --git a/docker/local-compose-orbit-settings.json b/docker/local-compose-orbit-settings.json index 848335976..cad7483c0 100644 --- a/docker/local-compose-orbit-settings.json +++ b/docker/local-compose-orbit-settings.json @@ -14,5 +14,12 @@ { "url": "${ETCD_ADDRESSABLE_DIR:-0.0.0.0}" } + ], + "meterRegistry": [ + "orbit.server.prometheus.PrometheusMetrics$PrometheusMetricsConfig", + { + "url": "/metrics", + "port": 8080 + } ] } \ No newline at end of file From 3bccf03c47a5f2f9087e9f235f4551607ce35895 Mon Sep 17 00:00:00 2001 From: Brett Morien Date: Thu, 17 Sep 2020 09:56:37 -0700 Subject: [PATCH 3/4] Fix a disconnection condition problem. --- .../src/main/kotlin/orbit/client/net/ConnectionHandler.kt | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/orbit-client/src/main/kotlin/orbit/client/net/ConnectionHandler.kt b/src/orbit-client/src/main/kotlin/orbit/client/net/ConnectionHandler.kt index 6b6186ef7..4002f86f1 100644 --- a/src/orbit-client/src/main/kotlin/orbit/client/net/ConnectionHandler.kt +++ b/src/orbit-client/src/main/kotlin/orbit/client/net/ConnectionHandler.kt @@ -59,7 +59,9 @@ internal class ConnectionHandler( fun disconnect() { if (::connectionChannel.isInitialized) { - connectionChannel.close() + if (!connectionChannel.isClosedForReceive) { + connectionChannel.close() + } messageRails.stopWorkers() } } @@ -86,8 +88,7 @@ internal class ConnectionHandler( disconnect() connect() } - } - else { + } else { logger.debug { "Testing connection but is not initialized" } } } From 973adb29cdf563b13f667e2ff3548410252f6ee8 Mon Sep 17 00:00:00 2001 From: Brett Morien Date: Thu, 17 Sep 2020 13:25:03 -0700 Subject: [PATCH 4/4] Better handle missing route to destination error, report and eat exceptions at the rail worker level to avoid exhausting workers. --- .../kotlin/orbit/server/mesh/ClusterManager.kt | 15 +++++++++++---- .../kotlin/orbit/util/concurrent/RailWorker.kt | 6 +++++- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/orbit-server/src/main/kotlin/orbit/server/mesh/ClusterManager.kt b/src/orbit-server/src/main/kotlin/orbit/server/mesh/ClusterManager.kt index 96adf5a42..be2e79654 100644 --- a/src/orbit-server/src/main/kotlin/orbit/server/mesh/ClusterManager.kt +++ b/src/orbit-server/src/main/kotlin/orbit/server/mesh/ClusterManager.kt @@ -126,16 +126,23 @@ class ClusterManager( fun findRoute(sourceNode: NodeId, targetNode: NodeId): List { val graph = nodeGraph.get() ?: buildGraph() + if (!graph.containsVertex(sourceNode)) { - logger.info { "Source node ${sourceNode} not found in cluster." } + logger.debug { "Source node $sourceNode not found in cluster." } return emptyList() } if (!graph.containsVertex(targetNode)) { - logger.info { "Target node ${targetNode} not found in cluster." } + logger.debug { "Target node $targetNode not found in cluster." } return emptyList() } - val path = DijkstraShortestPath.findPathBetween(graph, sourceNode, targetNode) - return path?.vertexList?.drop(1) ?: emptyList() + + return try { + val path = DijkstraShortestPath.findPathBetween(graph, sourceNode, targetNode) + path?.vertexList?.drop(1) + } catch (e: RuntimeException) { + logger.debug { "Could not find path between source and target nodes. $e" } + null + } ?: emptyList() } private fun buildGraph(): Graph { diff --git a/src/orbit-util/src/main/kotlin/orbit/util/concurrent/RailWorker.kt b/src/orbit-util/src/main/kotlin/orbit/util/concurrent/RailWorker.kt index ecec743e0..e33dbfdad 100644 --- a/src/orbit-util/src/main/kotlin/orbit/util/concurrent/RailWorker.kt +++ b/src/orbit-util/src/main/kotlin/orbit/util/concurrent/RailWorker.kt @@ -38,7 +38,11 @@ class RailWorker( workers = List(railCount) { scope.launch { for (msg in chan) { - onMessage(msg) + try { + onMessage(msg) + } catch (e: Throwable) { + logger?.warn { "Error: Exception caught in rail worker ${e}" } + } } } }