使用akka框架和scala语言编写简单的RPC通信案例

[复制链接]
yizhou 发表于 2023-9-5 18:23:11|来自:湖北 | 显示全部楼层 |阅读模式
前言
1)akka框架是一个并发的、分布式的、可伸缩性的、高性能的RPC通信框架,大数据开发框架Spark、flink底层原理中或多或少都用到了


2)scala语言真的很强大、好用、方便,结合了面向对象语言和函数式语言的特点
akka的原理图




大多数分布式框架或工具 都遵循着主从节点的架构设计,在这里我们暂不考虑高可用的模式(高可用可参考文章Zookeeper之HDFS-HA高可用模式)
每个机器上的一个进程中只存在着1个通信角色对象  ActorSystem  ,也就是说 ActorSystem 对象的示例只有一个,但由它创建的Master和Worker可以有多个,是多例
1)启动master   内部定时器定期检查有无超时连接(就是在一定时间内没有向我发送心跳的worker),并将失效的进行移除
2)启动worker,跟master建立网络连接,将自己的信息(workerid,内存,内核数cpu等信息)发给master进行注册
3)master收到注册信息,将注册的信息进行保存到内存(高效),也可以持久化到磁盘或zookeeper当中(数据安全),之后向worker发送注册成功的信息
4)worker收到master发来的注册成功的信息,很高兴,并启动定时器,定期发送心跳,向master报活
代码实现

Worker类代码:
  1. import java.util.UUID
  2. import java.util.concurrent.TimeUnit
  3. import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
  4. import com.typesafe.config.ConfigFactory
  5. import scala.concurrent.duration._
  6. /**
  7. * @author:tom
  8. * @Date:Created in 16:49 2020/12/18
  9. */
  10. class Worker extends Actor {
  11.   var masterRef: ActorSelection = _
  12.   var workerId = UUID.randomUUID().toString
  13.   //在执行构造函数(实例化对象)之后、receive方法执行之前一定会执行一次
  14.   override def preStart(): Unit = {
  15.     //向master 进行注册信息
  16.     //可以与master建立连接
  17.     masterRef = context.actorSelection("akka.tcp://MasterActorSystem@localhost:8888/user/MasterActor")
  18.     //发送消息
  19.     masterRef ! RegisterWorker(workerId, "2048", 4)
  20.   }
  21.   override def receive: Receive = {
  22.     //自己给自己发送的周期消息
  23.     case SendHeartbeat => {
  24.       //      if () {
  25.       //
  26.       //      } 向Master发送心跳
  27.       masterRef ! HeartBeat(workerId)
  28.     }
  29.     case RegisteredWorker => {
  30.       //      println("a response from master")
  31.       //启动一个定时器
  32.       import context.dispatcher
  33.       context.system.scheduler.schedule(Duration(0, TimeUnit.MILLISECONDS), 10000.millisecond, self, SendHeartbeat)
  34.     }
  35.   }
  36. }
  37. object Worker {
  38.   def main(args: Array[String]): Unit = {
  39.     val host = "localhost"
  40.     val port = 9999
  41.     val configStr =
  42.       s"""
  43.          |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
  44.          |akka.remote.netty.tcp.hostname = $host
  45.          |akka.remote.netty.tcp.port = $port
  46.          |""".stripMargin
  47.     val config = ConfigFactory.parseString(configStr)
  48.     //创建workerActorSystem
  49.     val workerActorSystem = ActorSystem.apply("workerActorSystem", config)
  50.     //创建workerActor
  51.     val workerActor = workerActorSystem.actorOf(Props(new Worker), "WorkerActor")
  52.   }
  53. }
复制代码
Master代码:
  1. import akka.actor.{Actor, ActorSystem, Props}
  2. import com.typesafe.config.ConfigFactory
  3. import scala.collection.mutable
  4. import scala.concurrent.duration._
  5. /**
  6. * @author:tom
  7. * @Date:Created in 16:08 2020/12/18
  8. */
  9. class Master extends Actor {
  10.   //定义一个可变的HashMap集合用来存储worker的信息
  11.   val idToWorker = new mutable.HashMap[String, WorkerInfo]()
  12.   //master定期检查自己  是否有新的节点(worker出现)
  13.   override def preStart(): Unit = {
  14.     import context.dispatcher
  15.     context.system.scheduler.schedule(0 millisecond, 15000.millisecond, self, CheckTimeOutWorker)
  16.   }
  17.   //用来接收消息
  18.   override def receive: Receive = {
  19.     //模式匹配
  20.     case "hello" => {
  21.       println("hello~")
  22.     }
  23.     case "hi" => {
  24.       println("hi~")
  25.     }
  26.     //定时检查
  27.     case CheckTimeOutWorker => {
  28.       val deadWorkers = idToWorker.values.filter(w => System.currentTimeMillis() - w.lastHeartbeatTime > 30000)
  29.       deadWorkers.foreach(dw => {
  30.         idToWorker -= dw.workerId
  31.       })
  32.       println(s"current alive worker size:${idToWorker.size}")
  33.     }
  34.     //有worker来进行注册信息需要执行的逻辑
  35.     case RegisterWorker(workerId, memory, cores) => {
  36.       //      println(s"workerId:$workerId,memory:$memory,cores:$cores")
  37.       //worker 注册成功应该执行的逻辑
  38.       //将信息存入到内存集合当中
  39.       val workerInfo: WorkerInfo = new WorkerInfo(workerId, memory, cores)
  40.       idToWorker.put(workerId, workerInfo)
  41.       //返回一个注册成功的信息
  42.       sender() ! RegisteredWorker
  43.     }
  44.     //worker端发送过来的心跳信息
  45.     case HeartBeat(workerId) => {
  46.       //根据workerId到Map中查找对应的WorkerInfo
  47.       if (idToWorker.contains(workerId)) {
  48.         //如果存在 则取出信息
  49.         val workerInfo = idToWorker(workerId)
  50.         //更新上一次的心跳时间
  51.         workerInfo.lastHeartbeatTime = System.currentTimeMillis()
  52.       }
  53.     }
  54.   }
  55. }
  56. object Master {
  57.   def main(args: Array[String]): Unit = {
  58.     val host = "localhost"
  59.     val port = 8888
  60.     val configStr =
  61.       s"""
  62.          |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
  63.          |akka.remote.netty.tcp.hostname = $host
  64.          |akka.remote.netty.tcp.port = $port
  65.          |""".stripMargin
  66.     val config = ConfigFactory.parseString(configStr)
  67.     //创建一个ActorSystem实例(单例)
  68.     val masterActorSystem = ActorSystem("MasterActorSystem", config)
  69.     //创建一个Actor
  70.     val actor = masterActorSystem.actorOf(Props[Master], "MasterActor")
  71.     //自己给自己发消息
  72.     actor ! "hello"
  73.   }
  74. }
复制代码
更多学习、面试资料尽在微信公众号:Hadoop大数据开发
全部回复1 显示全部楼层
i163 发表于 2023-9-5 18:23:30|来自:湖北 | 显示全部楼层
已经typed了,否则消息类别多了不好调试

快速回帖

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则