当前位置 : 主页 > 编程语言 > 其它开发 >

Flink Kubernetes Application部署

来源:互联网 收集:自由互联 发布时间:2022-06-27
Flink Kubernetes Application部署环境 Flink 1.12.2 Docker 20.10.7 Kubernetes 1.20.2 JDK 1.8 K8s 配置 #创建flink的使用账户(账户名可以自定义,-n flink可以省略使用k8s默认的命名空间)kubectl create serviceaccount
Flink Kubernetes Application部署 环境

Flink 1.12.2
Docker 20.10.7
Kubernetes 1.20.2
JDK 1.8

K8s 配置
#创建flink的使用账户(账户名可以自定义,-n flink可以省略使用k8s默认的命名空间)
kubectl create serviceaccount flinkaccount -n flink
#对创建的用户赋予编辑权限(flink:flinkaccount是指想flink命名空间下的flinkaccount账户,可以使用default默认的命名空间)
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flinkaccount
Docker 镜像配置
FROM flink:1.12.2
RUN mkdir -p $FLINK_HOME/usrlib
#默认存放flink任务的jar在$FLINK_HOME/usrlib,k8s启动后会在这个文件夹寻找启动的jar
COPY ./application.jar $FLINK_HOME/usrlib/startup.jar
#打包Docker镜像,并推送镜像到远端仓库
docker build -t build:v1.0 .

docker tag build:v1.0 192.168.1.11/xxx/build:v1.0

docker push 192.168.1.11/xxx/build:v1.0

Java 代码

import io.fabric8.kubernetes.client.osgi.ManagedKubernetesClient;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
import java.util.*;
import java.util.concurrent.Executors;

@Slf4j
@Data
public final class StartupFlinkKubernetes {

    /**
     * K8s配置信息
     */
    private Configuration configuration;
    
    /**
     * K8s的客户端
     */
    private Fabric8FlinkKubeClient client;

    /**
     * K8s客户端
     */
    private ManagedKubernetesClient managedKubernetesClient;

    /**
     * 加载K8s的配置信息
     */
    private void loadConfig(){
        //加载一个默认K8s的配置
        configuration = GlobalConfiguration.loadConfiguration();

        //设置k8s的部署模式
        configuration.set(DeploymentOptions.ATTACHED,false);
        configuration.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());

        //设置应用的名称
        configuration.set(KubernetesConfigOptions.CLUSTER_ID, "applicationName");
        //设置Docker的远端镜像地址
        configuration.set(KubernetesConfigOptions.CONTAINER_IMAGE, "192.168.1.11/xxx/build:v1.0");

        //设置K8s的命名空间(部署应用的命名空间)(可选)
        configuration.set(KubernetesConfigOptions.NAMESPACE, "flink");
        //设置k8s使用的K8s账号(账号名称可自定义)
        configuration.set(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, "flinkaccount");

        //设置K8s对外暴露的端口方式,这里直接使用宿主机端口,可以在外面访问Flink的后台管理界面
        configuration.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,KubernetesConfigOptions.ServiceExposedType.NodePort);
        //设置K8s拉取Docker镜像的方式:Always 每次都拉取,Never 不拉取,IfNotPresent 本地没有再拉取
        configuration.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, KubernetesConfigOptions.ImagePullPolicy.Always);

        //设置镜像内启动Flink的jar包文件路径
        //仅支持file://开头的协议,且此jar包已经再Docker镜像中
        //Docker镜像的基础是Flink:1.12.0
        configuration.set(PipelineOptions.JARS, Collections.singletonList("file://"));

        //设置Flink的占用内存大小
        configuration.set(JobManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.parse("1024 mb"));
        //如果启动不起来,报错说是缺少配置可以在这里添加配置
        //具体的配置根据:flink/conf/flink-conf.yaml中的配置决定
        configuration.setString("taskmanager.memory.flink.size","1 gb");

        //千万别自己设置FLINK_CONF_DIR选项
        //这个选项会在启动K8s应用的时候读取宿主机上这个路径/conf的文件作为Docker镜像里面的Flink的conf
        //而且启动后执行Flink的环境是/opt/flink的不会使用这里配置的FLINK_CONF_DIR,从而导致启动应用失败
        //所以建议不要更改这个默认的选项,但可以在宿主机的这个路径上配置Flink的相关配置,到时候会被读取到部署的应用中
        //configuration.set(KubernetesConfigOptions.FLINK_CONF_DIR, flinkHome);
    }

    /**
     * 加载K8s客户端
     */
    private void loadClient(){
        //K8s客户端
        managedKubernetesClient = new ManagedKubernetesClient();
        //将之前的配置
        Map<String,String> toMap = configuration.toMap();
        //将之前设置的配置转换
        Map<String,Object> config = new HashMap<>(toMap.size());

        for (String key : toMap.keySet()){
            config.put(key,toMap.get(key));
        }
        //覆盖一下客户端的配置
        managedKubernetesClient.activate(config);
        //创建客户端
        client = new Fabric8FlinkKubeClient(configuration,managedKubernetesClient,() -> Executors.newFixedThreadPool(16));
    }

    /**
     * 启动任务
     * @param arg 启动参数
     * @throws Exception 启动异常
     */
    private void startApplication(String[] arg) throws Exception {
        //设置要运行的jar的class以及启动jar的参数
        ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(arg, "com.xxx.xxx");
        //创建k8s的描述器
        KubernetesClusterDescriptor kubernetesClusterDescriptor = new KubernetesClusterDescriptor(configuration,client);
        //创建k8s信息
        ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
        //部署应用启动到k8s
        ClusterClientProvider<String> provider = kubernetesClusterDescriptor.deployApplicationCluster(clusterSpecification,applicationConfiguration);
        //应用id
        String applicationId = provider.getClusterClient().getClusterId();
        //访问Flink Web的地址
        String webAddress = provider.getClusterClient().getWebInterfaceURL();
    }

}

【来源:国外高防服务器 http://www.558idc.com/stgf.html 欢迎留下您的宝贵建议】
上一篇:go Cobra命令行工具入门
下一篇:没有了
网友评论