SparkSubmit提交流程分析 tips:分析基于如下执行命令开始 ./spark-submit \--class org.apache.spark.examples.SparkPi \--master yarn \--deploy-mode cluster \./examples/jars/spark-example_2.12-3.0.0.jar \10 首先执行了spark
tips:分析基于如下执行命令开始
./spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-example_2.12-3.0.0.jar \
10
首先执行了spark-submit这个脚本程序,找到这个脚本的代码
#!/usr/bin/env bash
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
#exec 调用spark-class脚本 然后传入SparkSubmit这个类 和 上面那一堆参数
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
然后我们去看spark-class这个脚本的代码(只关注重点版):
#!/usr/bin/env bash
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
. "${SPARK_HOME}"/bin/load-spark-env.sh
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ "$(command -v java)" ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
if [ -d "${SPARK_HOME}/jars" ]; then
SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi
if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
echo "You need to build Spark with the target \"package\" before running this program." 1>&2
exit 1
else
LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
fi
#3.$RUNNER="${JAVA_HOME}/bin/java" 调用类路径中的org.apache.spark.launcher.Main类 参数为spark-submit指定的所有参数,在这里调用launcher生成下面jvm command
build_command() {
"$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}
set +o posix
CMD=()
DELIM=$'\n'
CMD_START_FLAG="false"
while IFS= read -d "$DELIM" -r ARG; do
if [ "$CMD_START_FLAG" == "true" ]; then
#2.CMD在这个循环里一直做累加,这个循环通过build_command把参数准备好
CMD+=("$ARG")
else
if [ "$ARG" == $'\0' ]; then
DELIM=''
CMD_START_FLAG="true"
elif [ "$ARG" != "" ]; then
echo "$ARG"
fi
fi
done < <(build_command "$@")
#1。我们执行了一个cmd,这个cmd从哪儿来的
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"
最后执行的cmd:
/usr/lib/java/jdk1.8.0_144/bin/java -cp \
/home/etluser/kong/spark/spark-3.0.0-bin/spark-3.0.0-bin-hadoop3.2/conf/:/home/etluser/kong/spark/spark-3.0.0-bin/spark-3.0.0-bin-hadoop3.2/jars/* \
-Xmx1g \
org.apache.spark.deploy.SparkSubmit \
--master yarn \
--deploy-mode client \
--class org.apache.spark.examples.SparkPi \
./examples/jars/spark-example_2.12-3.0.0.jar
所以,spark提交脚本很关键的点在于org.apache.spark.deploy.SparkSubmit这个类是怎么运作的,其他的都是参数,我们就先看看这个类的代码:
//一个可以运行的类肯定有main方法,所以我们从main方法开始
override def main(args: Array[String]): Unit = {
//new 了一个sparksubmit的匿名内部类
val submit = new SparkSubmit() {
self =>
override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
new SparkSubmitArguments(args) {
override protected def logInfo(msg: => String): Unit = self.logInfo(msg)
override protected def logWarning(msg: => String): Unit = self.logWarning(msg)
override protected def logError(msg: => String): Unit = self.logError(msg)
}
}
override protected def logInfo(msg: => String): Unit = printMessage(msg)
override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")
override protected def logError(msg: => String): Unit = printMessage(s"Error: $msg")
//所以是执行了这个方法,这个方法又调用的父类的doSubmit(args)
override def doSubmit(args: Array[String]): Unit = {
try {
super.doSubmit(args)
} catch {
case e: SparkUserAppException =>
exitFn(e.exitCode)
}
}
}
//然后用匿名内部类执行了一个dosubmit方法,此方法在匿名内部类里已被重写
submit.doSubmit(args)
}
1.从super.dosubmit开始的提交流程
def doSubmit(args: Array[String]): Unit = {
// Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
// be reset before the application starts.
// 这个是日志,暂且不看
val uninitLog = initializeLogIfNecessary(true, silent = true)
// *parseArguments这个方法返回了appArgs,作用在于解析参数
val appArgs = parseArguments(args)
if (appArgs.verbose) {
logInfo(appArgs.toString)
}
//这里模式匹配 appArgs.action属性一定在下面这四个之中,所以我们从parseArguments方法开始
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
1.1 parseArguments(args)
protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
//构造方法 执行了关键的1.1.1 和 1.1.2 两个东西
new SparkSubmitArguments(args)
}
1.1.1 SparkSubmitArguments(args)
try {
//代码块儿
parse(args.asJava)
} catch {
case e: IllegalArgumentException =>
SparkSubmit.printErrorAndExit(e.getMessage())
}
1.1.2 loadEnvironmentArguments()
1.1.1.1 parse(args.asJava)//很明显了嘛,在爪子嘛,在格式化输入的参数撒
protected final void parse(List<String> args) {
//这个就是分离参数的正则表达式
Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");
int idx = 0;
for (idx = 0; idx < args.size(); idx++) {
String arg = args.get(idx);
String value = null;
Matcher m = eqSeparatedOpt.matcher(arg);
if (m.matches()) {
arg = m.group(1);
value = m.group(2);
}
// Look for options with a value.
String name = findCliOption(arg, opts);
if (name != null) {
if (value == null) {
if (idx == args.size() - 1) {
throw new IllegalArgumentException(
String.format("Missing argument for option '%s'.", arg));
}
idx++;
value = args.get(idx);
}
if (!handle(name, value)) {
break;
}
continue;
}
// Look for a switch.
name = findCliOption(arg, switches);
if (name != null) {
// * 这里就是参数解析的关键函数
if (!handle(name, null)) {
break;
}
continue;
}
if (!handleUnknown(arg)) {
break;
}
}
if (idx < args.size()) {
idx++;
}
handleExtraArgs(args.subList(idx, args.size()));
}
1.1.1.1.1 handle(name, null)
//看到这个模式匹配是不是一下就清晰了,找到这个参数,然后给属性赋值
override protected def handle(opt: String, value: String): Boolean = {
opt match {
// protected final String NAME = "--name";
case NAME =>
name = value
// protected final String MASTER = "--master";
case MASTER =>
master = value
// protected final String CLASS = "--class";
case CLASS =>
mainClass = value
case NUM_EXECUTORS =>
numExecutors = value
case TOTAL_EXECUTOR_CORES =>
totalExecutorCores = value
case EXECUTOR_CORES =>
executorCores = value
case EXECUTOR_MEMORY =>
executorMemory = value
case DRIVER_MEMORY =>
driverMemory = value
case DRIVER_CORES =>
driverCores = value
case _ =>
throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
}
true
}
1.1.2 loadEnvironmentArguments()
// Action should be SUBMIT unless otherwise specified
//第一次执行action为空 那么action赋值一定为submit
action = Option(action).getOrElse(SUBMIT)
1.2 submit(appArgs, uninitLog)
runMain(args, uninitLog)
1.2.1 runMain(args, uninitLog) 删除不重要的log版
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
// (childArgs, childClasspath, sparkConf, childMainClass)
// childMainClass =》 "org.apache.spark.deploy.yarn.YarnClusterApplication"
-- prepareSubmitEnvironment(args)
// classForName(childMainClass)
-- var mainClass: Class[_] = Utils.classForName(childMainClass)
// classOf[SparkApplication].isAssignableFrom(mainClass)
val app: SparkApplication =
-- a)mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
-- b)new JavaMainApplication(mainClass)
// "org.apache.spark.deploy.yarn.YarnClusterApplication"
app.start(childArgs.toArray, sparkConf)
}
1.2.1.1 prepareSubmitEnvironment(args) 删除不重要版
if (isYarnCluster) {
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
}
1.2.1.2 app.start(childArgs.toArray, sparkConf) 删除不重要版
override def start(args: Array[String], conf: SparkConf): Unit = {
// SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
// so remove them from sparkConf here for yarn mode.
conf.remove(JARS)
conf.remove(FILES)
// new ClientArguments(args) 解析传过来的参数 其中 --class => userClass = value =>自己执行的那个类
// private val yarnClient = YarnClient.createYarnClient
// YarnClient client = new YarnClientImpl();
// protected ApplicationClientProtocol rmClient; resourceManager的客户端说明这个client 是用来和resourceManager做交互的
// 对像明白了,接下来看看run里面都是些啥
new Client(new ClientArguments(args), conf, null).run()
}
1.2.1.2.1 rmClient.run() 删除不重要版
def run(): Unit = {
this.appId = submitApplication()
}
def submitApplication(): ApplicationId = {
try {
//启动了连接
launcherBackend.connect()
yarnClient.init(hadoopConf)
yarnClient.start()
// 从我们的 RM 获取新的应用程序
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
// 设置适当的上下文来启动我们的 AM 进程
// 创建容器
// commands = JAVA_HOME/bin/java org.apache.spark.deploy.yarn.ApplicationMaster
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)
// 最后,提交并监控申请
yarnClient.submitApplication(appContext)
launcherBackend.setAppId(appId.toString)
reportLauncherState(SparkAppHandle.State.SUBMITTED)
appId
} catch {
...
}
}