Jak zrobić prostą aplikację z Akka Cluster

Jeśli przeczytałeś moją poprzednią historię o Scalachain, prawdopodobnie zauważyłeś, że jest on daleki od bycia systemem rozproszonym. Brakuje wszystkich funkcji do poprawnej pracy z innymi węzłami. Dodaj do tego, że łańcuch bloków złożony z jednego węzła jest bezużyteczny. Z tego powodu zdecydowałem, że czas popracować nad tym problemem.

Skoro Scalachain jest zasilany przez Akka, dlaczego nie skorzystać z okazji, aby zagrać z Akka Cluster? Stworzyłem prosty projekt, aby trochę majstrować przy Akka Cluster, iw tej historii podzielę się swoimi doświadczeniami. Zamierzamy stworzyć klaster trzech węzłów, używając routerów Cluster Aware Routers, aby zrównoważyć obciążenie między nimi. Wszystko będzie działać w kontenerze Docker, a do łatwego wdrożenia użyjemy docker-compose.

Ok, ruszajmy! ?

Szybkie wprowadzenie do Akka Cluster

Akka Cluster zapewnia doskonałe wsparcie przy tworzeniu aplikacji rozproszonych. Najlepszym przypadkiem użycia jest węzeł, który chcesz replikować N razy w środowisku rozproszonym. Oznacza to, że wszystkie węzły N to węzły równorzędne uruchamiające ten sam kod. Akka Cluster umożliwia natychmiastowe wykrywanie członków w tym samym klastrze. Korzystając z routerów obsługujących klaster, można zrównoważyć komunikaty między aktorami w różnych węzłach. Możliwe jest również wybranie polityki równoważenia, dzięki czemu równoważenie obciążenia to bułka z masłem!

Właściwie możesz wybrać jeden z dwóch typów routerów:

Group Router - aktorzy, którzy mają wysyłać komunikaty do - zwanych trasami - są określani za pomocą ścieżki aktora. Routery współużytkują trasy utworzone w klastrze. W tym przykładzie użyjemy routera grupowego.

Router puli - trasy są tworzone i wdrażane przez router, więc są jego elementami podrzędnymi w hierarchii aktorów. Trasy nie są współdzielone między routerami. Jest to idealne rozwiązanie dla scenariusza z repliką podstawową, w którym każdy router jest podstawowym, a jego trasy są replikami.

To tylko wierzchołek góry lodowej, więc zapraszam do przeczytania oficjalnej dokumentacji, aby uzyskać więcej informacji.

Klaster do obliczeń matematycznych

Wyobraźmy sobie scenariusz użycia. Załóżmy, że zaprojektujemy system do wykonywania obliczeń matematycznych na żądanie. System jest wdrażany online, więc potrzebuje interfejsu API REST do odbierania żądań obliczeniowych. Żądania te obsługuje wewnętrzny procesor, wykonując obliczenia i zwracając wynik.

W tej chwili procesor może obliczyć tylko liczbę Fibonacciego. Decydujemy się na użycie klastra węzłów, aby rozłożyć obciążenie między węzłami i poprawić wydajność. Akka Cluster zajmie się dynamiką klastra i równoważeniem obciążenia między węzłami. Ok, brzmi dobrze!

Hierarchia aktorów

Po pierwsze: musimy zdefiniować naszą hierarchię aktorów. System można podzielić na trzy części funkcjonalne: logikę biznesową , zarządzanie klastrem i sam węzeł . Jest też serwer, ale to nie jest aktor, nad tym popracujemy później.

Logika biznesowa

Aplikacja powinna wykonywać obliczenia matematyczne. Możemy zdefiniować prostego Processoraktora do zarządzania wszystkimi zadaniami obliczeniowymi. Każde obsługiwane przez nas obliczenia można zaimplementować w konkretnym aktorze, który będzie dzieckiem tego Processorjednego. W ten sposób aplikacja jest modułowa i łatwiejsza w rozbudowie i utrzymaniu. W tej chwili jedynym dzieckiem Processorbędzie ProcessorFibonacciaktor. Przypuszczam, że można się domyślić, jakie jest jego zadanie. To powinno wystarczyć na początek.

Zarządzanie klastrem

Do zarządzania klastrem potrzebujemy pliku ClusterManager. Brzmi prosto, prawda? Ten aktor obsługuje wszystko, co jest związane z klastrem, na przykład zwraca jego członków na pytanie. Przydałoby się rejestrować, co dzieje się wewnątrz klastra, więc definiujemy ClusterListeneraktora. Jest to element podrzędny ClusterManageri subskrybuje rejestrowanie zdarzeń klastra.

Węzeł

NodeAktor jest korzeniem naszej hierarchii. Jest to punkt wejścia naszego systemu, który komunikuje się z API. The Processori the ClusterManagersą jego dziećmi, wraz z ProcessorRouteraktorem. To jest system równoważenia obciążenia systemu, który rozkłada obciążenie między Processors. Skonfigurujemy go jako router obsługujący klastry, aby każdy ProcessorRoutermógł wysyłać wiadomości do Processorkażdego węzła.

Realizacja aktora

Czas na realizację naszych aktorów! Najpierw zaimplementujemy aktorów związanych z logiką biznesową systemu. Następnie przechodzimy do aktorów do zarządzania klastrem i aktora głównego ( Node) na końcu.

ProcessorFibonacci

Aktor ten wykonuje obliczenia liczby Fibonacciego. Otrzymuje Computewiadomość zawierającą liczbę do obliczenia i referencję aktora, któremu należy odpowiedzieć. Odniesienie jest ważne, ponieważ mogą występować różne podmioty żądające. Pamiętaj, że pracujemy w środowisku rozproszonym!

Po Computeodebraniu wiadomości fibonaccifunkcja oblicza wynik. Zawijamy go w ProcessorResponseobiekt, aby dostarczyć informacje o węźle, który wykonał obliczenia. Będzie to przydatne później, aby zobaczyć, jak działa zasada działania okrężnego.

Wynik jest następnie wysyłany do aktora, któremu mamy odpowiedzieć. Bułka z masłem.

object ProcessorFibonacci { sealed trait ProcessorFibonacciMessage case class Compute(n: Int, replyTo: ActorRef) extends ProcessorFibonacciMessage def props(nodeId: String) = Props(new ProcessorFibonacci(nodeId)) def fibonacci(x: Int): BigInt = { @tailrec def fibHelper(x: Int, prev: BigInt = 0, next: BigInt = 1): BigInt = x match { case 0 => prev case 1 => next case _ => fibHelper(x - 1, next, next + prev) } fibHelper(x) } } class ProcessorFibonacci(nodeId: String) extends Actor { import ProcessorFibonacci._ override def receive: Receive = { case Compute(value, replyTo) => { replyTo ! ProcessorResponse(nodeId, fibonacci(value)) } } }

Edytor

ProcessorAktor udaje konkretne sub-procesory, jak ten Fibonacciego. Powinien tworzyć instancje podprzetwarzających i przekazywać im żądania. Teraz mamy tylko jeden sub-procesor, dzięki czemu Processorotrzymuje jeden rodzaj wiadomości: ComputeFibonacci. Ta wiadomość zawiera liczbę Fibonacciego do obliczenia. Po odebraniu numer do obliczenia jest wysyłany do a FibonacciProcessor, wraz z odniesieniem do pliku sender().

object Processor { sealed trait ProcessorMessage case class ComputeFibonacci(n: Int) extends ProcessorMessage def props(nodeId: String) = Props(new Processor(nodeId)) } class Processor(nodeId: String) extends Actor { import Processor._ val fibonacciProcessor: ActorRef = context.actorOf(ProcessorFibonacci.props(nodeId), "fibonacci") override def receive: Receive = { case ComputeFibonacci(value) => { val replyTo = sender() fibonacciProcessor ! Compute(value, replyTo) } } }

ClusterListener

Chcielibyśmy rejestrować przydatne informacje o tym, co dzieje się w klastrze. Może nam to pomóc w debugowaniu systemu, jeśli zajdzie taka potrzeba. Taki jest cel ClusterListeneraktora. Przed rozpoczęciem subskrybuje komunikaty o zdarzeniach klastra. Do reaguje aktora do wiadomości lubi MemberUp, UnreachableMemberalbo MemberRemoved, rejestrowanie odpowiedniego zdarzenia. Gdy ClusterListenerjest zatrzymany, anuluje subskrypcję zdarzeń klastra.

object ClusterListener { def props(nodeId: String, cluster: Cluster) = Props(new ClusterListener(nodeId, cluster)) } class ClusterListener(nodeId: String, cluster: Cluster) extends Actor with ActorLogging { override def preStart(): Unit = { cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) } override def postStop(): Unit = cluster.unsubscribe(self) def receive = { case MemberUp(member) => log.info("Node {} - Member is Up: {}", nodeId, member.address) case UnreachableMember(member) => log.info(s"Node {} - Member detected as unreachable: {}", nodeId, member) case MemberRemoved(member, previousStatus) => log.info(s"Node {} - Member is Removed: {} after {}", nodeId, member.address, previousStatus) case _: MemberEvent => // ignore } }

ClusterManager

Podmiotem odpowiedzialnym za zarządzanie klastrem jest ClusterManager. Tworzy ClusterListeneraktora i na żądanie udostępnia listę członków klastra. Można go rozszerzyć, aby dodać więcej funkcji, ale na razie to wystarczy.

object ClusterManager { sealed trait ClusterMessage case object GetMembers extends ClusterMessage def props(nodeId: String) = Props(new ClusterManager(nodeId)) } class ClusterManager(nodeId: String) extends Actor with ActorLogging { val cluster: Cluster = Cluster(context.system) val listener: ActorRef = context.actorOf(ClusterListener.props(nodeId, cluster), "clusterListener") override def receive: Receive = { case GetMembers => { sender() ! cluster.state.members.filter(_.status == MemberStatus.up) .map(_.address.toString) .toList } } }

ProcessorRouter

Równoważenie obciążenia między procesorami jest obsługiwane przez ProcessorRouter. Tworzy go Nodeaktor, ale tym razem wszystkie wymagane informacje są podane w konfiguracji systemu.

class Node(nodeId: String) extends Actor { //... val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") //... }

Przeanalizujmy odpowiednią część w application.confpliku.

akka { actor { ... deployment { /node/processorRouter { router = round-robin-group routees.paths = ["/user/node/processor"] cluster { enabled = on allow-local-routees = on } } } } ... }

Pierwszą rzeczą jest określenie ścieżki do aktora routera, to znaczy /node/processorRouter. Wewnątrz tej właściwości możemy skonfigurować zachowanie routera:

  • router: jest to zasada równoważenia obciążenia wiadomości. Wybrałem round-robin-group, ale jest wiele innych.
  • routees.paths: są to ścieżki do aktorów, którzy będą odbierać wiadomości obsługiwane przez router. Mówimy: „Kiedy otrzymujesz wiadomość, szukaj aktorów odpowiadających tym ścieżkom. Wybierz jeden zgodnie z zasadami i przekaż mu wiadomość ”. Ponieważ używamy routerów obsługujących klaster, trasy mogą znajdować się w dowolnym węźle klastra.
  • cluster.enabled: czy działamy w klastrze? Odpowiedź brzmi onoczywiście!
  • cluster.allow-local-routees: tutaj pozwalamy routerowi wybrać rutę w swoim węźle.

Korzystając z tej konfiguracji, możemy stworzyć router, aby zrównoważyć pracę między naszymi procesorami.

Węzeł

Podstawą naszej hierarchii aktorów jest Node. Tworzy dzieci-aktorów - ClusterManager, Processori ProcessorRouter- i przekazuje wiadomości do właściwego. Nie ma tu nic skomplikowanego.

object Node { sealed trait NodeMessage case class GetFibonacci(n: Int) case object GetClusterMembers def props(nodeId: String) = Props(new Node(nodeId)) } class Node(nodeId: String) extends Actor { val processor: ActorRef = context.actorOf(Processor.props(nodeId), "processor") val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") val clusterManager: ActorRef = context.actorOf(ClusterManager.props(nodeId), "clusterManager") override def receive: Receive = { case GetClusterMembers => clusterManager forward GetMembers case GetFibonacci(value) => processorRouter forward ComputeFibonacci(value) } }

Serwer i API

Every node of our cluster runs a server able to receive requests. The Server creates our actor system and is configured through the application.conf file.

object Server extends App with NodeRoutes { implicit val system: ActorSystem = ActorSystem("cluster-playground") implicit val materializer: ActorMaterializer = ActorMaterializer() val config: Config = ConfigFactory.load() val address = config.getString("http.ip") val port = config.getInt("http.port") val nodeId = config.getString("clustering.ip") val node: ActorRef = system.actorOf(Node.props(nodeId), "node") lazy val routes: Route = healthRoute ~ statusRoutes ~ processRoutes Http().bindAndHandle(routes, address, port) println(s"Node $nodeId is listening at //$address:$port") Await.result(system.whenTerminated, Duration.Inf) }

Akka HTTP powers the server itself and the REST API, exposing three simple endpoints. These endpoints are defined in the NodeRoutes trait.

The first one is /health, to check the health of a node. It responds with a 200 OK if the node is up and running

lazy val healthRoute: Route = pathPrefix("health") { concat( pathEnd { concat( get { complete(StatusCodes.OK) } ) } ) }

The /status/members endpoint responds with the current active members of the cluster.

lazy val statusRoutes: Route = pathPrefix("status") { concat( pathPrefix("members") { concat( pathEnd { concat( get { val membersFuture: Future[List[String]] = (node ? GetClusterMembers).mapTo[List[String]] onSuccess(membersFuture) { members => complete(StatusCodes.OK, members) } } ) } ) } ) }

The last (but not the least) is the /process/fibonacci/n endpoint, used to request the Fibonacci number of n.

lazy val processRoutes: Route = pathPrefix("process") { concat( pathPrefix("fibonacci") { concat( path(IntNumber) { n => pathEnd { concat( get { val processFuture: Future[ProcessorResponse] = (node ? GetFibonacci(n)).mapTo[ProcessorResponse] onSuccess(processFuture) { response => complete(StatusCodes.OK, response) } } ) } } ) } ) }

It responds with a ProcessorResponse containing the result, along with the id of the node where the computation took place.

Cluster Configuration

Once we have all our actors, we need to configure the system to run as a cluster! The application.conf file is where the magic takes place. I’m going to split it in pieces to present it better, but you can find the complete file here.

Let’s start defining some useful variables.

clustering { ip = "127.0.0.1" ip = ${?CLUSTER_IP} port = 2552 port = ${?CLUSTER_PORT} seed-ip = "127.0.0.1" seed-ip = ${?CLUSTER_SEED_IP} seed-port = 2552 seed-port = ${?CLUSTER_SEED_PORT} cluster.name = "cluster-playground" }

Here we are simply defining the ip and port of the nodes and the seed, as well as the cluster name. We set a default value, then we override it if a new one is specified. The configuration of the cluster is the following.

akka { actor { provider = "cluster" ... /* router configuration */ ... } remote { log-remote-lifecycle-events = on netty.tcp { hostname = ${clustering.ip} port = ${clustering.port} } } cluster { seed-nodes = [ "akka.tcp://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port} ] auto-down-unreachable-after = 10s } } ... /* server vars */ ... /* cluster vars */ }

Akka Cluster is build on top of Akka Remoting, so we need to configure it properly. First of all, we specify that we are going to use Akka Cluster saying that provider = "cluster". Then we bind cluster.ip and cluster.port to the hostname and port of the netty web framework.

The cluster requires some seed nodes as its entry points. We set them in the seed-nodes array, in the format akka.tcp://"{clustering.cluster.name}"@"{clustering.seed-ip}":”${clustering.seed-port}”. Right now we have one seed node, but we may add more later.

The auto-down-unreachable-after property sets a member as down after it is unreachable for a period of time. This should be used only during development, as explained in the official documentation.

Ok, the cluster is configured, we can move to the next step: Dockerization and deployment!

Dockerization and deployment

To create the Docker container of our node we can use sbt-native-packager. Its installation is easy: add addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15") to the plugin.sbt file in the project/ folder. This amazing tool has a plugin for the creation of Docker containers. it allows us to configure the properties of our Dockerfile in the build.sbt file.

// other build.sbt properties enablePlugins(JavaAppPackaging) enablePlugins(DockerPlugin) enablePlugins(AshScriptPlugin) mainClass in Compile := Some("com.elleflorio.cluster.playground.Server") dockerBaseImage := "java:8-jre-alpine" version in Docker := "latest" dockerExposedPorts := Seq(8000) dockerRepository := Some("elleflorio")

Once we have setup the plugin, we can create the docker image running the command sbt docker:publishLocal. Run the command and taste the magic… ?

We have the Docker image of our node, now we need to deploy it and check that everything works fine. The easiest way is to create a docker-compose file that will spawn a seed and a couple of other nodes.

version: '3.5' networks: cluster-network: services: seed: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '2552:2552' - '8000:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: seed CLUSTER_SEED_IP: seed node1: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8001:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node1 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552 node2: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8002:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node2 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552

I won’t spend time going through it, since it is quite simple.

Let’s run it!

Time to test our work! Once we run the docker-compose up command, we will have a cluster of three nodes up and running. The seed will respond to requests at port :8000, while node1 and node2 at port :8001 and :8002. Play a bit with the various endpoints. You will see that the requests for a Fibonacci number will be computed by a different node each time, following a round-robin policy. That’s good, we are proud of our work and can get out for a beer to celebrate! ?

Conclusion

We are done here! We learned a lot of things in these ten minutes:

  • What Akka Cluster is and what can do for us.
  • How to create a distributed application with it.
  • How to configure a Group Router for load-balancing in the cluster.
  • How to Dockerize everything and deploy it using docker-compose.

You can find the complete application in my GitHub repo. Feel free to contribute or play with it as you like! ?

See you! ?