)

NomoLog

Log of my life and program

Scala + Akka

Posted in Scala Tagged as akka

Scalaのakkaを試してみた

akka

そういやScala始めてしばらくになるけどakka触ったことなかったなあこれはやばいなあということでちょっと勉強してみました。

akkaって?

Scala2.10から公式に標準装備された並列分散処理のためのScala, Java用フレームワークだそうです。

akka公式サイト

じゃあ使ってみよう

akka公式サイトによると最新バージョンは2.3.1だそうな、ふむふむ…さっそくGettinng Startedをやってみるか

Akka 「Akkaを勉強する一番いい方法はTypesafe Activatorをダウンロードしてテンプレートを試すことだよ^^ 」

ぼく 「あ、そうなの?じゃあそれ試してみよう」

一時間後…

1
2
3
4
5
6
7
8
9
10
11
12

        ____
      /      \ 
     /  ─    ─   \  全然わからねえ...
   /    (○)  (○) \
    |       (__人__) |   ___________
   \      ` ⌒´  ,/ .| |          |
   ノ           \ .| |          |
. /´                | |             |
 |    l            | |          |
 ヽ    -一ー_~、⌒)^),-、    .|_|___________|
  ヽ ____,ノγ⌒ヽ)ニニニニ      _|_|__|_

なんだよソースだけ渡されてもわかんねーよ!!もっと言葉で伝えてくれなきゃわかんねーよオレを誰だと思ってんだ!!ヾ(`Д´)ノ”

てな感じで悪戦苦闘してググるも、なかなかよい情報が見つからず…

で、よくよくみたらakka 2.0.2ではπを求める計算をサンプルに非常に丁寧に説明されていた…このチュートリアルでいいじゃん…なんでTypesafe Activatorとかいうのインストールさせたんだよクソが…

とか思いつつバージョンが微妙に古いので上手く行かなかったりで適宜書き換えつつ、サンプルを使ってお勉強しました。なるほど、大まかな流れはつかめたぞ!

個人的にポイントだなと感じたのは以下の点です!akka 2.0.2を読みながら(むしろそちらメインで)読んでいただくと非常にわかりやすいかと!

  1. Actor
  2. メッセージパッシング
  3. ルータ

Actor

akkaの並列処理ではActorというものを使うそうです。Actorとは、Javaの並列処理というで使うスレッドをイメージしてもらえればオッケーかと思います。ただしActorを特徴づけるのが、Actorがお互いにメッセージのやりとりをして処理を進めていく点です。

メッセージパッシング

Actorはメッセージのやりとりをして処理を進めていくと書きましたが、そのための仕組みがメッセージパッシングです。ちょっとサンプルソースを見てみましょう。(今気づいたけどPiApproximationはPiMessage継承してないけどいいのかな…一応動いちゃいるけども)

1
2
3
4
5
sealed trait PiMessage
case object Calculate extends PiMessage
case class Work(start: Int, nrOfElements: Int) extends PiMessage
case class Result(value: Double) extends PiMessage
case class PiApproximation(pi: Double, duration: Duration)

上記ではπの計算に使うメッセージを定義しています。パターンマッチで使うのでcase classで宣言しているようです。 それぞれのメッセージはそれぞれ次のActor間で使用されます。

Actor 役割
Worker 実際に計算を行なう
Master 処理の起点、Workerの計算結果をまとめる
Listener 結果をprintする

こちらがActorの定義です。

1
2
3
4
5
6
7
8
9
10
11
12
13
class Worker extends Actor {

 def calculatePiFor(start:Int, nrOfElements:Int):Double = {
   var acc = 0.0
   for (i <- start until (start+nrOfElements))
     acc += 4.0 * (1- (i%2) * 2) / (2 * i + 1)
   acc
 }
 def receive = {
  case Work(start, nrOfElements) =>
   sender ! Result(calculatePiFor(start,nrOfElements))
  }
}

各ActorはActorクラスを継承して定義します。 注目して欲しいのがreceiveメソッドです。receiveメソッドはそのActorがなんらかのメッセージを受け取った時に呼ばれるメソッドです。パターンマッチを使用でき、 メッセージの種類に応じて処理を変えることができます。

メッセージの送信は、!を使って

1
master ! culculate

のように行います。master Actorにculculateメッセージを送るという意味です。

メッセージを受け取ってから処理を送信者へ返す場合はsenderが予約されているのでそれを使いましょう。

ちなみにメッセージは無理に定義しなくてもIntとかStringとかも使えるようです。

ルータ

Master内でこのルータを宣言しています。ルータはそれぞれのWorkerに等しい量のタスクを振り分けるのに使われているようです。

この辺はもうちょっと勉強がいりますが、要はMasterがWorkerを作り各Workerに処理を振り分けているということだと思います。

1
2
3
4
5
6
7
8
9
10
11
12
class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, listener: ActorRef) extends Actor {
  var pi: Double = _
  var nrOfResults: Int = _
  val start: Long = System.currentTimeMillis
      
  val workerRouter = context.actorOf(Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")
              
  def receive = {
    // handle messages ...
  }
                                  
}

全体の流れ

全体の流れはこんな感じです。

  1. 処理に必要なActorを定義する(今回はMaster,Worker,Listener)
  2. Actor間のやりとりに必要なメッセージを定義する
  3. Masterが処理の起点、各Workerの処理を割り振る
  4. Workerの処理が全て終わったら結果をマージ、Listenerへ投げる
  5. Listenerで結果表示(Listenerってなんでいるんだろう…?)

以下が全ソースコードです。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import akka.actor._
import akka.routing._

sealed trait PiMessage
case object Calculate extends PiMessage
case class Work(start:Int, nrOfElements:Int) extends PiMessage
case class Result(value:Double) extends PiMessage
case class PiApproximation(pi:Double, duration:Long)

class Worker extends Actor {

  def calculatePiFor(start:Int, nrOfElements:Int):Double = {
    var acc = 0.0
    for (i <- start until (start+nrOfElements))
      acc += 4.0 * (1- (i%2) * 2) / (2 * i + 1)
    acc
  }
  def receive = {
    case Work(start, nrOfElements) =>
      sender ! Result(calculatePiFor(start,nrOfElements))
  }
}

class Master(nrOfWorkers:Int,nrOfMessages:Int, nrOfElements:Int, Listener:ActorRef) extends Actor {
  var pi:Double = _
  var nrOfResults:Int = _
  val start:Long = System.currentTimeMillis

  val workersRouter = context.actorOf(Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workersRouter")

  def receive = {
    case Calculate =>
      for (i <- 0 until nrOfMessages) workersRouter ! Work(i * nrOfElements, nrOfElements)
    case Result(value) =>
      pi += value
      nrOfResults += 1
      if (nrOfResults == nrOfMessages) {
        Listener ! PiApproximation(pi, duration = (System.currentTimeMillis- start))
        context.stop(self)
      }
  }
}

class Listener extends Actor {
  def receive = {
    case PiApproximation(pi, duration) =>
      println("Pi approximation: %s\nCalculation time: %s".format(pi,duration))
      context.system.shutdown()
  }
}
object AkkatestApp{
  def main(args:Array[String]):Unit  = {
    val input = readLine("1:sequence 2:parallel\n")
    if (input == "1")
      calculate(nrOfWorkers = 1, nrOfElements = 10000, nrOfMessages = 10000)
    else if (input == "2")
      calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
    else
      println("wrong num")

  }
  def calculate(nrOfWorkers:Int, nrOfElements:Int, nrOfMessages:Int) {
    val system = ActorSystem("PiSystem")

    val listener = system.actorOf(Props[Listener], name = "listener")

    val master = system.actorOf(Props(new Master(nrOfWorkers, nrOfMessages, nrOfElements, listener)), name = "master")

    master ! Calculate
  }
}

まとめ

こちらが今回参考にさせていただいたサイトです。

SclaのAkka入門

Scalaの並行処理とアクター、並列コレクション

Scala + akka で簡単なチャットサーバを書いてみたので開設してみるよ

Scalaのakka actorsを使いはじめる

まだまだakkaの入り口に立った程度ですがこの程度のことでも情報がなくてかなり時間がかかってしまった…

もっと頑張って勉強します!!

Comments