Flink Kubernetes Application部署环境 Flink 1.12.2 Docker 20.10.7 Kubernetes 1.20.2 JDK 1.8 K8s 配置 #创建flink的使用账户(账户名可以自定义,-n flink可以省略使用k8s默认的命名空间)kubectl create serviceaccount
Flink 1.12.2
Docker 20.10.7
Kubernetes 1.20.2
JDK 1.8
#创建flink的使用账户(账户名可以自定义,-n flink可以省略使用k8s默认的命名空间)
kubectl create serviceaccount flinkaccount -n flink
#对创建的用户赋予编辑权限(flink:flinkaccount是指想flink命名空间下的flinkaccount账户,可以使用default默认的命名空间)
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flinkaccount
Docker 镜像配置
FROM flink:1.12.2
RUN mkdir -p $FLINK_HOME/usrlib
#默认存放flink任务的jar在$FLINK_HOME/usrlib,k8s启动后会在这个文件夹寻找启动的jar
COPY ./application.jar $FLINK_HOME/usrlib/startup.jar
#打包Docker镜像,并推送镜像到远端仓库
docker build -t build:v1.0 .
docker tag build:v1.0 192.168.1.11/xxx/build:v1.0
docker push 192.168.1.11/xxx/build:v1.0
Java 代码
import io.fabric8.kubernetes.client.osgi.ManagedKubernetesClient;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
import java.util.*;
import java.util.concurrent.Executors;
@Slf4j
@Data
public final class StartupFlinkKubernetes {
/**
* K8s配置信息
*/
private Configuration configuration;
/**
* K8s的客户端
*/
private Fabric8FlinkKubeClient client;
/**
* K8s客户端
*/
private ManagedKubernetesClient managedKubernetesClient;
/**
* 加载K8s的配置信息
*/
private void loadConfig(){
//加载一个默认K8s的配置
configuration = GlobalConfiguration.loadConfiguration();
//设置k8s的部署模式
configuration.set(DeploymentOptions.ATTACHED,false);
configuration.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
//设置应用的名称
configuration.set(KubernetesConfigOptions.CLUSTER_ID, "applicationName");
//设置Docker的远端镜像地址
configuration.set(KubernetesConfigOptions.CONTAINER_IMAGE, "192.168.1.11/xxx/build:v1.0");
//设置K8s的命名空间(部署应用的命名空间)(可选)
configuration.set(KubernetesConfigOptions.NAMESPACE, "flink");
//设置k8s使用的K8s账号(账号名称可自定义)
configuration.set(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, "flinkaccount");
//设置K8s对外暴露的端口方式,这里直接使用宿主机端口,可以在外面访问Flink的后台管理界面
configuration.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,KubernetesConfigOptions.ServiceExposedType.NodePort);
//设置K8s拉取Docker镜像的方式:Always 每次都拉取,Never 不拉取,IfNotPresent 本地没有再拉取
configuration.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, KubernetesConfigOptions.ImagePullPolicy.Always);
//设置镜像内启动Flink的jar包文件路径
//仅支持file://开头的协议,且此jar包已经再Docker镜像中
//Docker镜像的基础是Flink:1.12.0
configuration.set(PipelineOptions.JARS, Collections.singletonList("file://"));
//设置Flink的占用内存大小
configuration.set(JobManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.parse("1024 mb"));
//如果启动不起来,报错说是缺少配置可以在这里添加配置
//具体的配置根据:flink/conf/flink-conf.yaml中的配置决定
configuration.setString("taskmanager.memory.flink.size","1 gb");
//千万别自己设置FLINK_CONF_DIR选项
//这个选项会在启动K8s应用的时候读取宿主机上这个路径/conf的文件作为Docker镜像里面的Flink的conf
//而且启动后执行Flink的环境是/opt/flink的不会使用这里配置的FLINK_CONF_DIR,从而导致启动应用失败
//所以建议不要更改这个默认的选项,但可以在宿主机的这个路径上配置Flink的相关配置,到时候会被读取到部署的应用中
//configuration.set(KubernetesConfigOptions.FLINK_CONF_DIR, flinkHome);
}
/**
* 加载K8s客户端
*/
private void loadClient(){
//K8s客户端
managedKubernetesClient = new ManagedKubernetesClient();
//将之前的配置
Map<String,String> toMap = configuration.toMap();
//将之前设置的配置转换
Map<String,Object> config = new HashMap<>(toMap.size());
for (String key : toMap.keySet()){
config.put(key,toMap.get(key));
}
//覆盖一下客户端的配置
managedKubernetesClient.activate(config);
//创建客户端
client = new Fabric8FlinkKubeClient(configuration,managedKubernetesClient,() -> Executors.newFixedThreadPool(16));
}
/**
* 启动任务
* @param arg 启动参数
* @throws Exception 启动异常
*/
private void startApplication(String[] arg) throws Exception {
//设置要运行的jar的class以及启动jar的参数
ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(arg, "com.xxx.xxx");
//创建k8s的描述器
KubernetesClusterDescriptor kubernetesClusterDescriptor = new KubernetesClusterDescriptor(configuration,client);
//创建k8s信息
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
//部署应用启动到k8s
ClusterClientProvider<String> provider = kubernetesClusterDescriptor.deployApplicationCluster(clusterSpecification,applicationConfiguration);
//应用id
String applicationId = provider.getClusterClient().getClusterId();
//访问Flink Web的地址
String webAddress = provider.getClusterClient().getWebInterfaceURL();
}
}
【来源:国外高防服务器 http://www.558idc.com/stgf.html 欢迎留下您的宝贵建议】