yizhou 发表于 2023-9-5 18:23:11

使用akka框架和scala语言编写简单的RPC通信案例

前言

1)akka框架是一个并发的、分布式的、可伸缩性的、高性能的RPC通信框架,大数据开发框架Spark、flink底层原理中或多或少都用到了
http://pic3.zhimg.com/v2-64a9b1417c2d4121a6a517bf98531a6e_r.jpg

2)scala语言真的很强大、好用、方便,结合了面向对象语言和函数式语言的特点
akka的原理图



http://pic2.zhimg.com/v2-f6b075db2d1ec65ffab2a7bbc1b3992d_r.jpg

大多数分布式框架或工具 都遵循着主从节点的架构设计,在这里我们暂不考虑高可用的模式(高可用可参考文章Zookeeper之HDFS-HA高可用模式)
每个机器上的一个进程中只存在着1个通信角色对象ActorSystem,也就是说 ActorSystem 对象的示例只有一个,但由它创建的Master和Worker可以有多个,是多例
1)启动master   内部定时器定期检查有无超时连接(就是在一定时间内没有向我发送心跳的worker),并将失效的进行移除
2)启动worker,跟master建立网络连接,将自己的信息(workerid,内存,内核数cpu等信息)发给master进行注册
3)master收到注册信息,将注册的信息进行保存到内存(高效),也可以持久化到磁盘或zookeeper当中(数据安全),之后向worker发送注册成功的信息
4)worker收到master发来的注册成功的信息,很高兴,并启动定时器,定期发送心跳,向master报活代码实现

Worker类代码:
import java.util.UUID
import java.util.concurrent.TimeUnit

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._

/**
* @author:tom
* @Date:Created in 16:49 2020/12/18
*/
class Worker extends Actor {


var masterRef: ActorSelection = _

var workerId = UUID.randomUUID().toString

//在执行构造函数(实例化对象)之后、receive方法执行之前一定会执行一次
override def preStart(): Unit = {

    //向master 进行注册信息
    //可以与master建立连接
    masterRef = context.actorSelection("akka.tcp://MasterActorSystem@localhost:8888/user/MasterActor")
    //发送消息
    masterRef ! RegisterWorker(workerId, "2048", 4)
}

override def receive: Receive = {
    //自己给自己发送的周期消息
    case SendHeartbeat => {
      //      if () {
      //
      //      } 向Master发送心跳
      masterRef ! HeartBeat(workerId)
    }

    case RegisteredWorker => {
      //      println("a response from master")

      //启动一个定时器
      import context.dispatcher
      context.system.scheduler.schedule(Duration(0, TimeUnit.MILLISECONDS), 10000.millisecond, self, SendHeartbeat)

    }
}
}

object Worker {

def main(args: Array): Unit = {
    val host = "localhost"
    val port = 9999
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = $host
         |akka.remote.netty.tcp.port = $port
         |""".stripMargin
    val config = ConfigFactory.parseString(configStr)
    //创建workerActorSystem
    val workerActorSystem = ActorSystem.apply("workerActorSystem", config)
    //创建workerActor
    val workerActor = workerActorSystem.actorOf(Props(new Worker), "WorkerActor")
}


}Master代码:
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.collection.mutable
import scala.concurrent.duration._

/**
* @author:tom
* @Date:Created in 16:08 2020/12/18
*/
class Master extends Actor {

//定义一个可变的HashMap集合用来存储worker的信息
val idToWorker = new mutable.HashMap()


//master定期检查自己是否有新的节点(worker出现)
override def preStart(): Unit = {
    import context.dispatcher
    context.system.scheduler.schedule(0 millisecond, 15000.millisecond, self, CheckTimeOutWorker)
}

//用来接收消息
override def receive: Receive = {

    //模式匹配
    case "hello" => {
      println("hello~")
    }
    case "hi" => {
      println("hi~")
    }

    //定时检查
    case CheckTimeOutWorker => {
      val deadWorkers = idToWorker.values.filter(w => System.currentTimeMillis() - w.lastHeartbeatTime > 30000)
      deadWorkers.foreach(dw => {
      idToWorker -= dw.workerId
      })
      println(s"current alive worker size:${idToWorker.size}")
    }

    //有worker来进行注册信息需要执行的逻辑
    case RegisterWorker(workerId, memory, cores) => {
      //      println(s"workerId:$workerId,memory:$memory,cores:$cores")

      //worker 注册成功应该执行的逻辑

      //将信息存入到内存集合当中
      val workerInfo: WorkerInfo = new WorkerInfo(workerId, memory, cores)
      idToWorker.put(workerId, workerInfo)
      //返回一个注册成功的信息
      sender() ! RegisteredWorker
    }

    //worker端发送过来的心跳信息
    case HeartBeat(workerId) => {
      //根据workerId到Map中查找对应的WorkerInfo
      if (idToWorker.contains(workerId)) {
      //如果存在 则取出信息
      val workerInfo = idToWorker(workerId)
      //更新上一次的心跳时间
      workerInfo.lastHeartbeatTime = System.currentTimeMillis()
      }
    }

}
}

object Master {

def main(args: Array): Unit = {
    val host = "localhost"
    val port = 8888
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = $host
         |akka.remote.netty.tcp.port = $port
         |""".stripMargin
    val config = ConfigFactory.parseString(configStr)
    //创建一个ActorSystem实例(单例)
    val masterActorSystem = ActorSystem("MasterActorSystem", config)
    //创建一个Actor
    val actor = masterActorSystem.actorOf(Props, "MasterActor")
    //自己给自己发消息
    actor ! "hello"
}


}更多学习、面试资料尽在微信公众号:Hadoop大数据开发

i163 发表于 2023-9-5 18:23:30

已经typed了,否则消息类别多了不好调试
页: [1]
查看完整版本: 使用akka框架和scala语言编写简单的RPC通信案例