这样,你可以理解为在Driver端启动了一个PRC Server。要运行这段代码也非常简单,直接在主程序里运行即可:
- // parameter server should be enabled by default
- if (!params.containsKey("streaming.ps.enable") || params.get("streaming.ps.enable").toString.toBoolean) {
- logger.info("ps enabled...")
- if (ss.sparkContext.isLocal) {
- localSchedulerBackend = new LocalPSSchedulerBackend(ss.sparkContext)
- localSchedulerBackend.start()
- } else {
- logger.info("start PSDriverBackend")
- psDriverBackend = new PSDriverBackend(ss.sparkContext)
- psDriverBackend.start()
- }
- }
这里我们需要实现local模式和cluster模式两种。
Driver启动了一个PRC Server,那么Executor端如何启动呢?Executor端似乎没有任何一个地方可以让我启动一个PRC Server? 其实有的,只是非常trick,我们知道Spark是允许自定义Metrics的,并且会调用用户实现的metric特定的方法,我们只要开发一个metric Sink,在里面启动RPC Server,骗过Spark即可。具体时下如下:
- class PSServiceSink(val property: Properties, val registry: MetricRegistry,
- securityMgr: SecurityManager) extends Sink with Logging {
- def env = SparkEnv.get
-
- var psDriverUrl: String = null
- var psExecutorId: String = null
- var hostname: String = null
- var cores: Int = 0
- var appId: String = null
- val psDriverPort = 7777
- var psDriverHost: String = null
- var workerUrl: Option[String] = None
- val userClassPath = new mutable.ListBuffer[URL]()
-
- def parseArgs = {
- //val runtimeMxBean = ManagementFactory.getRuntimeMXBean();
- //var argv = runtimeMxBean.getInputArguments.toList
- var argv = System.getProperty("sun.java.command").split("s+").toList
-
- .....
- psDriverHost = host
- psDriverUrl = "spark://ps-driver-endpoint@" + psDriverHost + ":" + psDriverPort
- }
-
- parseArgs
-
- def createRpcEnv = {
- val isDriver = env.executorId == SparkContext.DRIVER_IDENTIFIER
- val bindAddress = hostname
- val advertiseAddress = ""
- val port = env.conf.getOption("spark.ps.executor.port").getOrElse("0").toInt
- val ioEncryptionKey = if (env.conf.get(IO_ENCRYPTION_ENABLED)) {
- Some(CryptoStreamUtils.createKey(env.conf))
- } else {
- None
- }
- //logInfo(s"setup ps driver rpc env: ${bindAddress}:${port} clientMode=${!isDriver}")
- RpcEnv.create("PSExecutorBackend", bindAddress, port, env.conf,
- env.securityManager, clientMode = !isDriver)
- }
-
- override def start(): Unit = {
-
- new Thread(new Runnable {
- override def run(): Unit = {
- logInfo(s"delay PSExecutorBackend 3s")
- Thread.sleep(3000)
- logInfo(s"start PSExecutor;env:${env}")
- if (env.executorId != SparkContext.DRIVER_IDENTIFIER) {
- val rpcEnv = createRpcEnv
- val pSExecutorBackend = new PSExecutorBackend(env, rpcEnv, psDriverUrl, psExecutorId, hostname, cores)
- PSExecutorBackend.executorBackend = Some(pSExecutorBackend)
- rpcEnv.setupEndpoint("ps-executor-endpoint", pSExecutorBackend)
- }
- }
- }).start()
-
- }
- ...
- }
到这里,我们就能成功启动RPC Server,并且连接上Driver中的PRC Server。现在,你就可以在不修改Spark 源码的情况下,尽情的写通讯相关的代码了,让你可以更好的控制Executor。 (编辑:通化站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|