当前位置:网站首页>利用SparkLauncher 提交Job
利用SparkLauncher 提交Job
2022-08-10 15:35:00 【Perkinl】
一. 适用背景
在学习Spark过程中,资料中介绍的提交Spark Job的方式主要有两种(我所知道的):第一种是通过命令行的方式提交Job,使用spark 自带的spark-submit工具提交,官网和大多数参考资料都是已这种方式提交的,提交命令示例如下:
./spark-submit
--class com.learn.spark.SimpleApp
--master yarn
--deploy-mode client
--driver-memory 2g
--executor-memory 2g
--executor-cores
3 ../spark-demo.jar
第二种提交方式是已JAVA API编程的方式提交,这种方式不需要使用命令行,直接可以在IDEA中点击Run 运行包含Job的Main类就行,Spark 提供了以SparkLanuncher 作为唯一入口的API来实现。这种方式很方便(试想如果某个任务需要重复执行,但是又不会写linux 脚本怎么搞?我想到的是以JAV API的方式提交Job, 还可以和Spring整合,让应用在tomcat中运行),官网的示例:http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html
二. 文章的目地
官网已有demo和API的情况下写这篇文章的目地:官网给出的demo 放在本机跑不了。出现的现象是程序结束了,什么输出都没有或者输出JAVA_HOME is not set,虽然我调用方法设置了,然而没啥用,因此把我搜索和加上在自己思考后能够运行的demo记录下来。
三. 相关demo
根据官网的示例这里有两种方式:
第一种是调用SparkLanuncher实例的startApplication方法,但是这种方式在所有配置都正确的情况下使用运行都会失败的,原因是startApplication方法会调用LauncherServer启动一个进程与集群交互,这个操作貌似是异步的,所以可能结果是main主线程结束了这个进程都没有起起来,导致运行失败。解决办法是调用new SparkLanuncher().startApplication后需要让主线程休眠一定的时间后者是使用下面的例子:
package com.learn.spark;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
public class LanuncherAppV {
public static void main(String[] args) throws IOException, InterruptedException {
HashMap env = new HashMap();
//这两个属性必须设置
env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc/overriterHaoopConf");
env.put("JAVA_HOME", "/usr/local/java/jdk1.8.0_151");
//可以不设置
//env.put("YARN_CONF_DIR","");
CountDownLatch countDownLatch = new CountDownLatch(1);
//这里调用setJavaHome()方法后,JAVA_HOME is not set 错误依然存在
SparkAppHandle handle = new SparkLauncher(env)
.setSparkHome("/usr/local/spark")
.setAppResource("/usr/local/spark/spark-demo.jar")
.setMainClass("com.learn.spark.SimpleApp")
.setMaster("yarn")
.setDeployMode("cluster")
.setConf("spark.app.id", "11222")
.setConf("spark.driver.memory", "2g")
.setConf("spark.executor.memory", "1g")
.setConf("spark.executor.instances", "32")
.setConf("spark.executor.cores", "3")
.setConf("spark.default.parallelism", "10")
.setConf("spark.driver.allowMultipleContexts", "true")
.setVerbose(true).startApplication(new SparkAppHandle.Listener() {
//这里监听任务状态,当任务结束时(不管是什么原因结束),isFinal()方法会返回true,否则返回false
@Override
public void stateChanged(SparkAppHandle sparkAppHandle) {
if (sparkAppHandle.getState().isFinal()) {
countDownLatch.countDown();
}
System.out.println("state:" + sparkAppHandle.getState().toString());
}
@Override
public void infoChanged(SparkAppHandle sparkAppHandle) {
System.out.println("Info:" + sparkAppHandle.getState().toString());
}
});
System.out.println("The task is executing, please wait ....");
//线程等待任务结束
countDownLatch.await();
System.out.println("The task is finished!");
}
}
注意:如果部署模式是cluster,但是代码中有标准输出的话将看不到,需要把结果写到HDFS中,如果是client模式则可以看到输出。
第二种方式是:通过SparkLanuncher.lanunch()方法获取一个进程,然后调用进程的process.waitFor()方法等待线程返回结果,但是使用这种方式需要自己管理运行过程中的输出信息,比较麻烦,好处是一切都在掌握之中,即获取的输出信息和通过命令提交的方式一样,很详细,实现如下:
package com.learn.spark;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import java.io.IOException;
import java.util.HashMap;
public class LauncherApp {
public static void main(String[] args) throws IOException, InterruptedException {
HashMap env = new HashMap();
//这两个属性必须设置
env.put("HADOOP_CONF_DIR","/usr/local/hadoop/etc/overriterHaoopConf");
env.put("JAVA_HOME","/usr/local/java/jdk1.8.0_151");
//env.put("YARN_CONF_DIR","");
SparkLauncher handle = new SparkLauncher(env)
.setSparkHome("/usr/local/spark")
.setAppResource("/usr/local/spark/spark-demo.jar")
.setMainClass("com.learn.spark.SimpleApp")
.setMaster("yarn")
.setDeployMode("cluster")
.setConf("spark.app.id", "11222")
.setConf("spark.driver.memory", "2g")
.setConf("spark.akka.frameSize", "200")
.setConf("spark.executor.memory", "1g")
.setConf("spark.executor.instances", "32")
.setConf("spark.executor.cores", "3")
.setConf("spark.default.parallelism", "10")
.setConf("spark.driver.allowMultipleContexts","true")
.setVerbose(true);
Process process =handle.launch();
InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input");
Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
inputThread.start();
InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error");
Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
errorThread.start();
System.out.println("Waiting for finish...");
int exitCode = process.waitFor();
System.out.println("Finished! Exit code:" + exitCode);
}
}
使用的自定义InputStreamReaderRunnable类实现如下:
package com.learn.spark;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
public class InputStreamReaderRunnable implements Runnable {
private BufferedReader reader;
private String name;
public InputStreamReaderRunnable(InputStream is, String name) {
this.reader = new BufferedReader(new InputStreamReader(is));
this.name = name;
}
public void run() {
System.out.println("InputStream " + name + ":");
try {
String line = reader.readLine();
while (line != null) {
System.out.println(line);
line = reader.readLine();
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
边栏推荐
- FFmpeg 交叉编译
- 秒杀项目收获
- HUAWEI CLOUD DevCloud received the highest-level certification of the first batch of cloud-native technology architecture maturity assessments by the China Academy of Information and Communications Te
- 关于“算力”,这篇文章值得一看
- Yann LeCun转推:参数减少50倍,性能还更好,MetaAI推出Atlas信息检索模型
- spark面试常问问题
- 数据类型与整型存储
- Cesium快速上手4-Polylines图元使用讲解
- const修饰的指针变量(详解)
- Oracle database backup DMP file is too big, what method can be split into multiple DMP when backup?
猜你喜欢

NFT digital collection development issue - digital collection platform

关于Web渗透测试需要知道的一切:完整指南

TCP为什么是三次握手和四次挥手?

Recommend a few had better use the MySQL open source client, collection!

智为链接,慧享生活,荣耀智慧服务,只为 “懂” 你

安克创新每一个“五星好评”背后,有怎样的流程管理?

scala集合

QOS function introduction

功能测试vs.非功能测试:能否非此即彼地进行选择?

Servlet简单项目操作
随机推荐
Redis -- Nosql
Rich Dad Poor Dad Reading Notes
LeetCode_2598_剑指Offer Ⅱ 091.粉刷房子
JVM学习——2——内存加载过程(类加载器)
TestLink Export Use Case Transformation Tool
FP6378AS5CTR SOT - 23-5 effective 1 mhz2a synchronous buck regulator
pm2 static file service
一文带你了解 HONOR Connect
scala 10种函数高级应用
易基因|深度综述:m6A RNA甲基化在大脑发育和疾病中的表观转录调控作用
【芯片】人人皆可免费造芯?谷歌开源芯片计划已释放90nm、130nm和180nm工艺设计套件
Custom picker scroll selector style
scala 基础篇
多线程面试指南
机器学习天降福音!数据科学家、Kaggle大师发布「ML避坑宝典」
fastposter v2.9.1 programmer must-have poster generator
dedecms支持Word内容自动导入
虚拟电厂可视化大屏,深挖痛点精准减碳
Allwinner V853 development board transplants LVGL-based 2048 games
FFmpeg 交叉编译