Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ services:
dockerfile: docker/server/Dockerfile
ports:
- "5006:5005"
- "8080:8080"
expose:
- 50056
environment:
ORBIT_SETTINGS: /etc/orbit/orbit.json
ORBIT_URL: "orbit-server-1:50056"
ORBIT_PORT: 50056
ETCD_ADDRESSABLE_DIR: http://node-directory:2379
ETCD_NODE_DIR: http://addressable-directory:2379
ETCD_ADDRESSABLE_DIR: http://addressable-directory:2379
ETCD_NODE_DIR: http://node-directory:2379
entrypoint: sh ./opt/orbit/entrypoint.sh
volumes:
- ../src/orbit-application/build/libs:/opt/orbit/libs
Expand Down
7 changes: 7 additions & 0 deletions docker/local-compose-orbit-settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,12 @@
{
"url": "${ETCD_ADDRESSABLE_DIR:-0.0.0.0}"
}
],
"meterRegistry": [
"orbit.server.prometheus.PrometheusMetrics$PrometheusMetricsConfig",
{
"url": "/metrics",
"port": 8080
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ internal class ConnectionHandler(

fun disconnect() {
if (::connectionChannel.isInitialized) {
connectionChannel.close()
if (!connectionChannel.isClosedForReceive) {
connectionChannel.close()
}
messageRails.stopWorkers()
}
}
Expand All @@ -86,8 +88,7 @@ internal class ConnectionHandler(
disconnect()
connect()
}
}
else {
} else {
logger.debug { "Testing connection but is not initialized" }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package orbit.server.mesh

import mu.KotlinLogging
import orbit.server.OrbitServerConfig
import orbit.shared.exception.InvalidChallengeException
import orbit.shared.exception.InvalidNodeId
Expand All @@ -31,6 +32,7 @@ class ClusterManager(
private val clock: Clock,
private val nodeDirectory: NodeDirectory
) {
private val logger = KotlinLogging.logger { }
private val leaseExpiration = config.nodeLeaseDuration
private val clusterNodes = ConcurrentHashMap<NodeId, NodeInfo>()
private val nodeGraph = AtomicReference<Graph<NodeId, DefaultEdge>>()
Expand Down Expand Up @@ -124,8 +126,23 @@ class ClusterManager(

fun findRoute(sourceNode: NodeId, targetNode: NodeId): List<NodeId> {
val graph = nodeGraph.get() ?: buildGraph()
val path = DijkstraShortestPath.findPathBetween(graph, sourceNode, targetNode)
return path?.vertexList?.drop(1) ?: emptyList()

if (!graph.containsVertex(sourceNode)) {
logger.debug { "Source node $sourceNode not found in cluster." }
return emptyList()
}
if (!graph.containsVertex(targetNode)) {
logger.debug { "Target node $targetNode not found in cluster." }
return emptyList()
}

return try {
val path = DijkstraShortestPath.findPathBetween(graph, sourceNode, targetNode)
path?.vertexList?.drop(1)
} catch (e: RuntimeException) {
logger.debug { "Could not find path between source and target nodes. $e" }
null
} ?: emptyList()
}

private fun buildGraph(): Graph<NodeId, DefaultEdge> {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,25 @@ class RoutingStep(
}

fun retry(context: PipelineContext, msg: Message) {
context.pushNew(
val nextMsg =
if (msg.attempts < config.messageRetryAttempts) {
retryAttempts.increment()
msg.copy(
attempts = msg.attempts + 1
)
} else {
} else if (msg.content !is MessageContent.Error) {
retryErrors.increment()
msg.copy(
content = MessageContent.Error("Failed to deliver message after ${msg.attempts} attempts"),
target = MessageTarget.Unicast(msg.source!!)
target = MessageTarget.Unicast(msg.source!!),
attempts = 0
)
} else {
null
}
)

if (nextMsg != null) {
context.pushNew(nextMsg)
}
}
}
72 changes: 47 additions & 25 deletions src/orbit-server/src/test/kotlin/orbit/server/PipelineTests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package orbit.server

//import orbit.server.pipeline.step.RetryStep
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.anyOrNull
import com.nhaarman.mockitokotlin2.argThat
Expand All @@ -14,17 +15,14 @@ import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.never
import com.nhaarman.mockitokotlin2.verify
import kotlinx.coroutines.runBlocking
import orbit.server.mesh.ClusterManager
import orbit.server.pipeline.PipelineContext
import orbit.server.pipeline.step.RetryStep
import orbit.shared.mesh.NodeCapabilities
import orbit.server.pipeline.step.RoutingStep
import orbit.server.router.Router
import orbit.shared.mesh.NodeId
import orbit.shared.mesh.NodeInfo
import orbit.shared.mesh.NodeLease
import orbit.shared.mesh.NodeStatus
import orbit.shared.net.Message
import orbit.shared.net.MessageContent
import orbit.shared.net.MessageTarget
import orbit.shared.router.Route
import org.junit.Test

class PipelineTests : BaseServerTest() {
Expand All @@ -38,13 +36,14 @@ class PipelineTests : BaseServerTest() {
target = MessageTarget.Unicast(testNode)
)

val clusterManager = mock<ClusterManager>() {
onBlocking { getNode(testNode) } doReturn null
val router = mock<Router> {
onBlocking { findRoute(testNode, null) } doReturn (Route(emptyList()))
}
val retryStep = RetryStep(clusterManager, OrbitServerConfig())

val routingStep = RoutingStep(router, OrbitServerConfig())

val context = mock<PipelineContext>()
retryStep.onOutbound(context, message)
routingStep.onOutbound(context, message)

verify(context, never()).next(any())
verify(context).pushNew(message.copy(attempts = 1))
Expand All @@ -55,26 +54,22 @@ class PipelineTests : BaseServerTest() {
fun `When a node is present in cluster, message is sent to next step`() {
runBlocking {
val testNode = NodeId("test", "test")
val route = Route(listOf(testNode))
val message = Message(
mock<MessageContent.InvocationRequest>(),
target = MessageTarget.Unicast(testNode)
)

val clusterManager = mock<ClusterManager> {
onBlocking { getNode(testNode) } doReturn
NodeInfo(
testNode,
NodeCapabilities(listOf("test")),
lease = NodeLease.forever,
nodeStatus = NodeStatus.ACTIVE
)
val router = mock<Router> {
onBlocking { findRoute(testNode, null) } doReturn route
}
val retryStep = RetryStep(clusterManager, OrbitServerConfig())

val routingStep = RoutingStep(router, OrbitServerConfig())

val context = mock<PipelineContext>()
retryStep.onOutbound(context, message)
routingStep.onOutbound(context, message)

verify(context).next(message)
verify(context).next(message.copy(target = MessageTarget.RoutedUnicast(route)))
verify(context, never()).pushNew(any(), anyOrNull())
}
}
Expand All @@ -91,16 +86,43 @@ class PipelineTests : BaseServerTest() {
attempts = 11
)

val clusterManager = mock<ClusterManager> {
onBlocking { getNode(testNode) } doReturn null
val router = mock<Router> {
onBlocking { findRoute(testNode, null) } doReturn (Route(emptyList()))
}
val retryStep = RetryStep(clusterManager, OrbitServerConfig(messageRetryAttempts = 10))

val routingStep = RoutingStep(router, OrbitServerConfig())

val context = mock<PipelineContext>()
retryStep.onOutbound(context, message)
routingStep.onOutbound(context, message)

verify(context, never()).next(any())
verify(context).pushNew(argThat { content is MessageContent.Error }, anyOrNull())
}
}

@Test
fun `When an error message exceeds retry attempts, the message is discarded`() {
runBlocking {
val testNode = NodeId("test", "test")
val sourceNode = NodeId("source", "test")
val message = Message(
mock<MessageContent.Error>(),
target = MessageTarget.Unicast(testNode),
source = sourceNode,
attempts = 11
)

val router = mock<Router> {
onBlocking { findRoute(testNode, null) } doReturn (Route(emptyList()))
}

val routingStep = RoutingStep(router, OrbitServerConfig())

val context = mock<PipelineContext>()
routingStep.onOutbound(context, message)

verify(context, never()).next(any())
verify(context, never()).pushNew(any(), anyOrNull())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ class RailWorker<T>(
workers = List(railCount) {
scope.launch {
for (msg in chan) {
onMessage(msg)
try {
onMessage(msg)
} catch (e: Throwable) {
logger?.warn { "Error: Exception caught in rail worker ${e}" }
}
}
}
}
Expand Down