Scala+Akka 実践ティップス

実践的なコードによるScalaティップス

当初、Scalaの導入目的は、オープンソースアプリのバグ修正・コード改訂でしたが、アクターモデルによるアルゴリズムがデータマネージメントアプリを作成するのに有用だと考え、より深く学習することにしました。導入時のややっこしい内容を、実際のコードで理解するためのティップスをここに纏めます。サンプルコード内でのActorは、旧版(Classic)のAPIにより記述されていることが多いため、時間があれば新APIに変換してみて下さい。新旧API混在での記述もサポートしています。

パターンマッチング

Name Binding

def requestMoreInfo(p: Person): String = { ... }
val bob = Person("Bob", 34, List("Inception", "The Departed"))
val bobsInfo = bob match {
    case Person(name, age, movies) => s"$name's info: ${requestMoreInfo(Person(name, age, movies))}"
}

“p @” による代替

val bobsInfo = bob match {
    case p @ Person(name, _, _) => s"$name's info: ${requestMoreInfo(p)}"
}

“movies @” による代替

val bobsInception = bob match {
    case Person(name, _, movies @ List("Inception", _*)) => s"$name REALLY likes Inception, some other movies too: $movies"
}

Akkaアクターシステムの設定とSLF4Jによるアクターのログ取得

アクター間のメッセージの遣り取りを、SLF4Jによるログシステムにより確認します。スケジューラーにより5秒毎にメッセージの遣り取りが実行されます。

chapter-conf-deploy

build.sbt

name := "deploy"

version := "1.0"

organization := "manning"

scalacOptions ++= Seq(
  "-deprecation",
  "-unchecked",
  "-Xlint",
  "-Ywarn-unused",
  "-Ywarn-dead-code",
  "-feature",
  "-language:_"
)

enablePlugins(JavaAppPackaging)

scriptClasspath +="../conf"

libraryDependencies ++= {
  val akkaVersion = "2.6.10"
  Seq(
    "com.typesafe.akka" %%  "akka-actor"      % akkaVersion,
    "com.typesafe.akka" %%  "akka-slf4j"      % akkaVersion,
    "ch.qos.logback"     %  "logback-classic" % "1.2.3",
    "com.typesafe.akka" %%  "akka-testkit"    % akkaVersion   % "test",
    "org.scalatest"     %%  "scalatest"       % "3.2.2"       % "test"
  )
}

scala.sbt

scalaVersion := "2.13.1"

scalacOptions ++= Seq(
  "-deprecation",
  "-unchecked",
  "-Xfatal-warnings",
  "-feature",
  "-language:_"
)

appilication.conf

akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]

  # Options: ERROR, WARNING, INFO, DEBUG
  loglevel = "DEBUG"
}

reference.conf

helloWorld {
    timer=5000
}

BootHello.scala

package aia.deploy


import akka.actor.{ Props, ActorSystem }
import scala.concurrent.duration._

object BootHello extends App {

  val system = ActorSystem("hello-kernel")

  val actor = system.actorOf(Props[HelloWorld])
  val config = system.settings.config
  val timer = config.getInt("helloWorld.timer")
  system.actorOf(Props(
      new HelloWorldCaller(
        timer = timer millis,
        actor = actor)))
}

HelloWorld.scala

package aia.deploy

import akka.actor.{Actor, ActorLogging, ActorRef}

import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._


class HelloWorld extends Actor
  with ActorLogging {

  def receive: Receive = {
    case msg: String  =>
      val hello = "Hello %s".format(msg)
      sender() ! hello
      log.info("Sent response {}",hello)
  }
}

class HelloWorldCaller(timer: FiniteDuration, actor: ActorRef)
  extends Actor with ActorLogging {

  case class TimerTick(msg: String)

  override def preStart(): Unit = {
    super.preStart()
    implicit val ec: ExecutionContextExecutor = context.dispatcher
    context.system.scheduler.scheduleAtFixedRate(
      timer,
      timer,
      self,
      TimerTick("everybody"))
  }

  def receive: Receive = {
    case msg: String  => log.info("received {}",msg)
    case tick: TimerTick => actor ! tick.msg
  }
}

実行結果

[INFO] [11/30/2020 17:47:30.471] [hello-kernel-akka.actor.default-dispatcher-6] [akka://hello-kernel/user/$a] Sent response Hello everybody
[INFO] [11/30/2020 17:47:30.471] [hello-kernel-akka.actor.default-dispatcher-7] [akka://hello-kernel/user/$b] received Hello everybody
[INFO] [11/30/2020 17:47:35.439] [hello-kernel-akka.actor.default-dispatcher-5] [akka://hello-kernel/user/$a] Sent response Hello everybody
[INFO] [11/30/2020 17:47:35.439] [hello-kernel-akka.actor.default-dispatcher-7] [akka://hello-kernel/user/$b] received Hello everybody
[INFO] [11/30/2020 17:47:40.439] [hello-kernel-akka.actor.default-dispatcher-5] [akka://hello-kernel/user/$a] Sent response Hello everybody
[INFO] [11/30/2020 17:47:40.439] [hello-kernel-akka.actor.default-dispatcher-7] [akka://hello-kernel/user/$b] received Hello everybody
[INFO] [11/30/2020 17:47:45.438] [hello-kernel-akka.actor.default-dispatcher-7] [akka://hello-kernel/user/$a] Sent response Hello everybody
[INFO] [11/30/2020 17:47:45.438] [hello-kernel-akka.actor.default-dispatcher-5] [akka://hello-kernel/user/$b] received Hello everybody
........................
........................

REST API サンプルコード

イベントとそのチケット数の登録・チケット購入システム

プロジェクトフォルダでサーバ起動

$ sbt run
INFO  [Slf4jLogger]: Slf4jLogger started
Success
INFO  [go-ticks]: RestApi bound to /0:0:0:0:0:0:0:0:5000 

イベントRHCPのチケットを10枚ポスト

$ http POST localhost:5000/events/RHCP tickets:=10
HTTP/1.1 201 Created
Content-Length: 28
Content-Type: application/json
Date: Thu, 03 Dec 2020 04:45:27 GMT
Server: GoTicks.com REST API

{
    "name": "RHCP",
    "tickets": 10
}

イベントDjMadlibのチケットを15枚ポスト

$ http POST localhost:5000/events/DjMadlib tickets:=15
HTTP/1.1 201 Created
Content-Length: 32
Content-Type: application/json
Date: Thu, 03 Dec 2020 04:45:51 GMT
Server: GoTicks.com REST API

{
    "name": "DjMadlib",
    "tickets": 15
}

イベント詳細確認

$ http GET localhost:5000/events
HTTP/1.1 200 OK
Content-Length: 74
Content-Type: application/json
Date: Thu, 03 Dec 2020 04:46:42 GMT
Server: GoTicks.com REST API

{
    "events": [
        {
            "name": "DjMadlib",
            "tickets": 15
        },
        {
            "name": "RHCP",
            "tickets": 10
        }
    ]
}

イベントRHCPのチケットを2枚購入

$ http POST localhost:5000/events/RHCP/tickets tickets:=2
HTTP/1.1 201 Created
Content-Length: 46
Content-Type: application/json
Date: Thu, 03 Dec 2020 04:46:59 GMT
Server: GoTicks.com REST API

{
    "entries": [
        {
            "id": 1
        },
        {
            "id": 2
        }
    ],
    "event": "RHCP"
}

イベント詳細確認(RHCPのチケット数確認)

$ http GET localhost:5000/events
HTTP/1.1 200 OK
Content-Length: 73
Content-Type: application/json
Date: Thu, 03 Dec 2020 04:47:35 GMT
Server: GoTicks.com REST API

{
    "events": [
        {
            "name": "DjMadlib",
            "tickets": 15
        },
        {
            "name": "RHCP",
            "tickets": 8
        }
    ]
}

“onComplete” to handle success and failure

Scalaシェル(REPL)

scala> :paste
// Entering paste mode (ctrl-D to finish)

import scala.util._  // Imports statement for Try, Success, and Failure
import scala.concurrent._
import ExecutionContext.Implicits.global

val futureFail = Future { throw new Exception("error!")}
futureFail.onComplete {
case Success(value) => println(value)
case Failure(e) => println(e)
}

// Exiting paste mode, now interpreting.

Exceptionによるエラー警告表示

java.lang.Exception: error!

REST APIサンプルコードの分割処理

負荷分散・拡張性を目的としたリモートノードバックエンドとして配置

シングルノードによる処理からノード分割による処理

IntelliJsbtシェルRunメニューから
FrontendRemoteDeployMain
を実行

[IJ]sbt:goticks> run
[warn] Multiple main classes detected.  Run 'show discoveredMainClasses' to see the list

Multiple main classes detected, select one to run:

 [1] com.goticks.BackendMain
 [2] com.goticks.BackendRemoteDeployMain
 [3] com.goticks.FrontendMain
 [4] com.goticks.FrontendRemoteDeployMain
 [5] com.goticks.FrontendRemoteDeployWatchMain
 [6] com.goticks.SingleNodeMain

Enter number: 4

[info] running com.goticks.FrontendRemoteDeployMain 
[DEBUG] [12/07/2020 23:26:00.983] [run-main-0] [EventStream] StandardOutLogger started
INFO  [Slf4jLogger]: Slf4jLogger started
[DEBUG] [12/07/2020 23:26:01.872] [run-main-0] [EventStream(akka://frontend)] logger log1-Slf4jLogger started
[DEBUG] [12/07/2020 23:26:01.874] [run-main-0] [EventStream(akka://frontend)] Default Loggers started
WARN  [RemoteActorRefProvider]: Using the 'remote' ActorRefProvider directly, which is a low-level layer. For most use cases, the 'cluster' abstraction on top of remoting is more suitable instead.
WARN  [RemoteActorRefProvider]: Akka Cluster not in use - Using Akka Cluster is recommended if you need remote watch and deploy.
INFO  [ArteryTcpTransport]: Remoting started with transport [Artery tcp]; listening on address [akka://[email protected]:25520] with UID [8779393747958428879]
Success
INFO  [go-ticks]: RestApi bound to /0:0:0:0:0:0:0:0:5000 

フロントエンドとバックエンドが起動するので、同じくIntelliJのターミナルからhttpユーティリティにより、イベントとそのチケット数のPOSTや、GETにてイベントの確認などをして下さい。

$ http POST localhost:5000/events/RHCP tickets:=10
HTTP/1.1 201 Created
Content-Length: 28
Content-Type: application/json
Date: Mon, 07 Dec 2020 14:32:23 GMT
Server: GoTicks.com REST API

{
    "name": "RHCP",
    "tickets": 10
}
$ http GET localhost:5000/events/RHCP
HTTP/1.1 200 OK
Content-Length: 28
Content-Type: application/json
Date: Mon, 07 Dec 2020 14:33:38 GMT
Server: GoTicks.com REST API

{
    "name": "RHCP",
    "tickets": 10
}

Akkaクラスター

Multi JVM Testing

Multi Node Testing

Akkaクラスタードキュメント

akka-cluster-jobprocess

akka-cluster-process

サンプルコード開発環境変更

  • Java SDK 11.0.9
  • Scala:2.13.4
  • Akka:2.6.10
  • sbt:1.3.13

旧リモート設定(TCP)は非推奨のため、Arteryリモート設定(UDP)へ変更

Classic Remoting

Artery Remoting

移行ガイド

Split Brain Resolver

Downing

Akkaクラスターサンプルコード

メッセージとして以下のListを送信し、Workerが各単語の合計数をカウントする。

List("this is a test ", "this is a test", "this is", "this")

タスクフロー
deploy-job-workers

jobworker-enlist

jobworker-send-task

return-task-result

terminate-worker

変更箇所
最新のSDKで動作確認できるようコード修正、ファイル追加など

実行(test2は1回のみカウント、test3はListを重複させ100回カウント、test4は”FAIL”検出によるエラー処理テスト)

$ sbt test
[info] welcome to sbt 1.3.13 (AdoptOpenJDK Java 11.0.9.1)
[info] loading global plugins from /home/user/.sbt/1.0/plugins
[info] loading settings for project chapter-cluster-build from plugins.sbt ...
[info] loading project definition from /home/user/IdeaProjects/akka-in-action/chapter-cluster/project
[info] loading settings for project chapter-cluster from scala.sbt,build.sbt ...
[info] set current project to words-cluster (in build file:/home/user/IdeaProjects/akka-in-action/chapter-cluster/)
INFO  [Slf4jLogger]: Slf4jLogger started
INFO  [TestReceptionist]: Received job test2
...
...

INFO  [TestReceptionist]: Job test2 complete.
INFO  [TestReceptionist]: result:Map(this -> 4, is -> 3, a -> 2, test -> 2)

...
...
INFO  [TestReceptionist]: Job test3 complete.
INFO  [TestReceptionist]: result:Map(this -> 800, is -> 600, a -> 400, test -> 400)
INFO  [TestReceptionist]: Received job test4

...
ERROR [TestReceptionist]: Job Master Actor[akka://test/user/receptionist/master-test4#-490024209] terminated before finishing job.
ERROR [TestReceptionist]: Job test4 failed.

...
INFO  [TestReceptionist]: Job test4 complete.
INFO  [TestReceptionist]: result:Map(this -> 1, is -> 1, a -> 1, test -> 1)

...
[info] LocalWordsSpec:
[info] The words system
[info] - must count the occurrence of words in a text
[info] - must count many occurences of words in a text
[info] - must continue to process a job with intermittent failures
[info] Run completed in 3 seconds, 285 milliseconds.
[info] Total number of tests run: 3
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 8 s, completed Dec 12, 2020, 10:30:53 PM

設定ファイルによる各ノード(シードノード、マスターノード、ワーカーノード)の動作確認

sbt-assemblyによりjarファイル作成(target/words-node.jar)

$ sbt assemply

ポートと設定ファイルを指定してノード起動(別ターミナルで起動すること)

$ java -DPORT=2551 -Dconfig.resource=/seed.conf -jar target/words-node.jar

$ java -DPORT=2554 -Dconfig.resource=/master.conf -jar target/words-node.jar

$ java -DPORT=2555 -Dconfig.resource=/worker.conf -jar target/words-node.jar

$ java -DPORT=2556 -Dconfig.resource=/worker.conf -jar target/words-node.jar