直接上干货
引入依赖
<properties><pdi.version>8.2.0.0-342</pdi.version>
</properties>
<!--kettle dependency start-->
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-core</artifactId>
<version>${pdi.version}</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-dbdialog</artifactId>
<version>${pdi.version}</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-engine</artifactId>
<version>${pdi.version}</version>
</dependency>
<dependency>
<groupId>pentaho</groupId>
<artifactId>metastore</artifactId>
<version>${pdi.version}</version>
</dependency>
<dependency>
<groupId>org.pentaho</groupId>
<artifactId>pentaho-metadata</artifactId>
<version>${pdi.version}</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-ui-swt</artifactId>
<version>${pdi.version}</version>
</dependency>
<dependency>
<groupId>pentaho</groupId>
<artifactId>pentaho-vfs-browser</artifactId>
<version>${pdi.version}</version>
</dependency>
<dependency>
<groupId>org.pentaho.di.plugins</groupId>
<artifactId>kettle-sap-plugin-core</artifactId>
<version>${pdi.version}</version>
</dependency>
<dependency>
<groupId>org.pentaho.di.plugins</groupId>
<artifactId>kettle-json-plugin-core</artifactId>
<version>${pdi.version}</version>
</dependency>
<!-- big data plugin start -->
<dependency>
<groupId>pentaho</groupId>
<artifactId>pentaho-big-data-api-hdfs</artifactId>
<version>${pdi.version}</version>
</dependency>
<dependency>
<groupId>pentaho</groupId>
<artifactId>pentaho-hadoop-shims-osgi-jaas</artifactId>
<version>8.2.0.0-342</version>
</dependency>
<dependency>
<groupId>pentaho</groupId>
<artifactId>pentaho-big-data-legacy</artifactId>
<version>8.2.0.0-342</version>
<exclusions> <!-- 解决依赖冲突问题 下同 -->
<exclusion>
<groupId>hsqldb</groupId>
<artifactId>hsqldb</artifactId>
</exclusion>
<exclusion>
<groupId>libthrift</groupId><!-- hive报错,可能是这个依赖冲突 -->
<artifactId>libthrift</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>pentaho</groupId>
<artifactId>pentaho-big-data-plugin</artifactId>
<version>${pdi.version}</version>
<type>pom</type>
<exclusions>
<exclusion>
<groupId>libthrift</groupId>
<artifactId>libthrift</artifactId>
</exclusion>
<exclusion>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-core</artifactId>
</exclusion>
<exclusion>
<groupId>pentaho</groupId>
<artifactId>pentaho-big-data-assemblies-pmr-libraries</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
<version>5.1.0.Final</version>
</dependency>
<dependency>
<groupId>pentaho</groupId>
<artifactId>pentaho-big-data-kettle-plugins-hdfs</artifactId>
<version>${pdi.version}</version>
</dependency>
<dependency>
<groupId>pentaho</groupId>
<artifactId>pentaho-big-data-impl-cluster</artifactId>
<version>${pdi.version}</version>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.compendium</artifactId>
<version>4.3.1</version>
</dependency>
<dependency>
<groupId>pentaho</groupId>
<artifactId>pentaho-big-data-impl-vfs-hdfs</artifactId>
<version>8.2.0.0-342</version>
</dependency>
<dependency>
<groupId>pentaho</groupId>
<artifactId>pentaho-big-data-impl-shim-initializer</artifactId>
<version>8.2.0.0-342</version>
</dependency>
<dependency>
<groupId>pentaho</groupId>
<artifactId>pentaho-big-data-impl-shim-hdfs</artifactId>
<version>8.2.0.0-342</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>pentaho</groupId>
<artifactId>pentaho-big-data-impl-shim-common</artifactId>
<version>8.2.0.0-342</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.pentaho/pentaho-hadoop-shims-hdp30-package -->
<!-- 对应客户端软件里hadoop-configurations里面的hdp30配置 -->
<dependency>
<groupId>org.pentaho</groupId><!-- hdp30依赖,不同发行商引入不同的依赖 -->
<artifactId>pentaho-hadoop-shims-hdp30-package</artifactId>
<version>8.2.2018.11.00-342</version>
<type>zip</type>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.netbeans</groupId>
<artifactId>mof</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-xml</artifactId>
</exclusion>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>pentaho</groupId>
<artifactId>pentaho-big-data-impl-shim-shimTests</artifactId>
<version>8.2.0.0-342</version>
</dependency>
<!-- big data plugin end -->
<dependency>
<groupId>org.pentaho</groupId>
<artifactId>commons-xul-core</artifactId>
<version>${pdi.version}</version>
</dependency>
<dependency>
<groupId>org.pentaho</groupId>
<artifactId>commons-xul-swt</artifactId>
<version>${pdi.version}</version>
</dependency>
<dependency>
<groupId>org.pentaho</groupId>
<artifactId>commons-xul-swing</artifactId>
<version>${pdi.version}</version>
</dependency>
<dependency>
<groupId>org.pentaho.di.plugins</groupId>
<artifactId>pdi-xml-plugin-core</artifactId>
<version>${pdi.version}</version>
</dependency>
<!--kettle dependency end-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
<!-- 将big data plugin引入需要用到的依赖,依赖中涉及osgi,BundleContext -->
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.framework</artifactId>
<version>4.2.1</version>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
<version>4.3.1</version>
<scope>provided</scope>
</dependency>
<!-- hadoop client相关依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
复制代码
仓库设置
<repositories><repository>
<id>pentaho-public</id>
<name>Pentaho Public</name>
<url>https://nexus.pentaho.org/content/groups/omni/</url>
<releases>
<enabled>true</enabled>
<updatePolicy>daily</updatePolicy>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>interval:15</updatePolicy>
</snapshots>
</repository>
<repository>
<id>public</id>
<name>aliyun nexus</name>
<url>https://maven.aliyun.com/repository/public/</url>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
复制代码
功能实现
1、kettle-steps.xml
将源码中的kettle-steps.xml文件复制到src/main/resource/下,并添加如下内容:
<step id="HadoopFileInput"> <description>i18n:org.pentaho.di.trans.step:BaseStep.TypeLongDesc.HadoopFileInput</description> <classname>org.pentaho.big.data.kettle.plugins.hdfs.trans.HadoopFileInputMeta</classname><category>i18n:org.pentaho.di.trans.step:BaseStep.Category.BigData</category> <tooltip>i18n:org.pentaho.di.trans.step:BaseStep.TypeLongDesc.HadoopFileInput</tooltip> <iconfile>ui/images/HDI.svg</iconfile> <documentation_url></documentation_url> <cases_url/> <forum_url/></step><step id="HadoopFileOutput"> <description>i18n:org.pentaho.di.trans.step:BaseStep.TypeLongDesc.HadoopFileOutput</description> <classname>org.pentaho.big.data.kettle.plugins.hdfs.trans.HadoopFileOutputMeta</classname><category>i18n:org.pentaho.di.trans.step:BaseStep.Category.BigData</category> <tooltip>i18n:org.pentaho.di.trans.step:BaseStep.TypeLongDesc.HadoopFileOutput</tooltip> <iconfile>ui/images/HDO.svg</iconfile> <documentation_url></documentation_url> <cases_url/> <forum_url/>
</step>
复制代码
2、messages多语言
新建src/main/resources/org/pentaho/di/trans/step/messages/messages_en_US.properties、src/main/resources/org/pentaho/di/trans/step/messages/messages_zh_CN.properties文件,
加入下内容:
BaseStep.Category.BigData=Big DataBaseStep.TypeLongDesc.HadoopFileInput=Hadoop file input
BaseStep.TypeLongDesc.HadoopFileOutput=Hadoop file output
复制代码
3、图标
将源码中HDI.svg、HDO.svg两个图标复制到 src/main/resources/ui/images/下
4、其他配置
将big-data-plugin-8.2\legacy\src\test\resources\plugin.properties文件复制到自己项目的src\main\resources\plugin.properties。
将big-data-plugin-8.2\assemblies\pmr-libraries\src\main\resources\classes\pmr.properties文件复制到src\main\resources\pmr.properties。
将big-data-plugin-8.2\legacy\src\main\resources\META-INF\version.properties文件复制到src\main\resources\META-INF\version.properties。
将pdi-ce-8.2.0.0-342\data-integration\plugins\pentaho-big-data-plugin/hadoop-configurations/下配置文件复制到src\main\resources\hadoop-configurations。将红框中的文件复制。如下图:
5、Hadoop Plugin初始化配置
@Overridepublic void contextInitialized(ServletContextEvent context) {
try {
// 日志缓冲不超过5000行,缓冲时间不超过720秒
KettleLogStore.init(5000, 720);
KettleEnvironment.init();
// HadoopSpoonPlugin插件注册,读取plugin.properties文件 start
String classname = "org.pentaho.di.core.hadoop.HadoopSpoonPlugin";
Class<LifecyclePluginType> pluginType = LifecyclePluginType.class;
Map<Class<?>, String> classMap = new HashMap<Class<?>, String>();
PluginMainClassType mainClassTypesAnnotation = pluginType.getAnnotation( PluginMainClassType.class );
classMap.put( mainClassTypesAnnotation.value(), classname );
// 解决打包成jar包时从jar包里读取文件url的问题
URL resource = getClass().getClassLoader()
.getResource("");
String tempUri = resource.toString().replace("classes!", "classes");
URL url = new URL(tempUri);
System.out.println("new url " + url);
// end
Plugin plugin = new Plugin(new String[] {HadoopSpoonPlugin.PLUGIN_ID },
StepPluginType.class,
LifecyclePluginType.class.getAnnotation(PluginMainClassType.class).value(),
"", "", "", null, false, false,
classMap, new ArrayList<String>(), null, url);
PluginRegistry.getInstance().registerPlugin(LifecyclePluginType.class, plugin);
// HadoopSpoonPlugin插件注册 end
// HadoopConfigurationBootstrap实例化
HadoopConfigurationBootstrap hadoopConfigurationBootstrap = HadoopConfigurationBootstrap.getInstance();
// 赋值Prompter
hadoopConfigurationBootstrap.setPrompter(new HadoopConfigurationPrompter() {
@Override
public String getConfigurationSelection(final List<HadoopConfigurationInfo> hadoopConfigurationInfos) {
return "hdp30";
}
@Override
public void promptForRestart() {
}
});
// hadoop环境初始化,根据plugin.active.hadoop.configuration属性到对应目录下读取hadoop-configurations里的配置文件
hadoopConfigurationBootstrap.onEnvironmentInit();
// OSGI的东西
BundleContext bundleContext = SpringUtil.getBean("bundleContext");
ShimBridgingServiceTracker shimBridgingServiceTracker = new ShimBridgingServiceTracker();
// 初始化一些配置
HadoopFileSystemFactoryLoader hadoopFileSystemFactoryLoader =
new HadoopFileSystemFactoryLoader( bundleContext, shimBridgingServiceTracker, hadoopConfigurationBootstrap );
PropsUI.init("KettleWebConsole", Props.TYPE_PROPERTIES_KITCHEN);
} catch (Exception e) {
e.printStackTrace();
}
}
6、Kettle启动配置
@Componentpublic class KettleStart implements ApplicationContextAware, LifeEventHandler {
private volatile static KettleStart kettleStart;
public static KettleDatabaseRepositoryMeta meta;
private LogChannelInterface log;
private TransExecutionConfiguration transExecutionConfiguration;
private TransExecutionConfiguration transPreviewExecutionConfiguration;
private TransExecutionConfiguration transDebugExecutionConfiguration;
private JobExecutionConfiguration jobExecutionConfiguration;
public PropsUI props;
// loads the lifecycle listeners
private LifecycleSupport lifecycleSupport = new LifecycleSupport();
private KettleStart() {
metaStore = new DelegatingMetaStore();
try {
IMetaStore localMetaStore = MetaStoreConst.openLocalPentahoMetaStore();
metaStore.addMetaStore( localMetaStore );
metaStore.setActiveMetaStoreName( localMetaStore.getName() );
} catch ( MetaStoreException e ) {
log.logError( "Unable to open local Pentaho Metastore", e );
}
props = PropsUI.getPropsUIInstance();
log = new LogChannel( PropsUI.getAppName());
loadSettings();
transExecutionConfiguration = new TransExecutionConfiguration();
transExecutionConfiguration.setGatheringMetrics( true );
transPreviewExecutionConfiguration = new TransExecutionConfiguration();
transPreviewExecutionConfiguration.setGatheringMetrics( true );
transDebugExecutionConfiguration = new TransExecutionConfiguration();
transDebugExecutionConfiguration.setGatheringMetrics( true );
jobExecutionConfiguration = new JobExecutionConfiguration();
variables = new RowMetaAndData( new RowMeta() );
try {
lifecycleSupport.onStart( this );
} catch ( LifecycleException e ) {
e.printStackTrace();
}
}
public void loadSettings() {
LogLevel logLevel = LogLevel.getLogLevelForCode(props.getLogLevel());
DefaultLogLevel.setLogLevel(logLevel);
log.setLogLevel(logLevel);
KettleLogStore.getAppender().setMaxNrLines(props.getMaxNrLinesInLog());
DBCache.getInstance().setActive(props.useDBCache());
}
@Bean
public static KettleStart getInstance() {
if (kettleStart == null) {
synchronized (KettleStart.class) {
if(kettleStart == null) {
kettleStart = new KettleStart();
}
}
}
return kettleStart;
}
private Repository repository;
public Repository getRepository() {
return repository;
}
private Repository defaultRepository;
public Repository getDefaultRepository() {
return this.defaultRepository;
}
public void selectRepository(Repository repo) {
if(repository != null) {
repository.disconnect();
}
repository = repo;
}
private DelegatingMetaStore metaStore;
public DelegatingMetaStore getMetaStore() {
return metaStore;
}
public LogChannelInterface getLog() {
return log;
}
private RowMetaAndData variables = null;
private ArrayList<String> arguments = new ArrayList<String>();
public String[] getArguments() {
return arguments.toArray(new String[arguments.size()]);
}
public JobExecutionConfiguration getJobExecutionConfiguration() {
return jobExecutionConfiguration;
}
public TransExecutionConfiguration getTransDebugExecutionConfiguration() {
return transDebugExecutionConfiguration;
}
public TransExecutionConfiguration getTransPreviewExecutionConfiguration() {
return transPreviewExecutionConfiguration;
}
public TransExecutionConfiguration getTransExecutionConfiguration() {
return transExecutionConfiguration;
}
public RowMetaAndData getVariables() {
return variables;
}
}
7、bean配置
big-data-plugin插件一些类初始化在kettle客户端操作的,所以这里我们需要手动构建
// OSGI的东西@Bean
@Scope("singleton")
public BundleContext bundleContext() throws BundleException {
FrameworkFactory frameworkFactory = ServiceLoader
.load(FrameworkFactory.class).iterator().next();
Framework framework = frameworkFactory.newFramework(new HashMap<>());
framework.start();
BundleContext bundleContext = framework.getBundleContext();
return bundleContext;
}
@Bean(value = "namedClusterService" )
@Scope("singleton")
public NamedClusterService namedClusterService() {
BundleContext bundleContext = SpringUtil.getBean("bundleContext");
NamedClusterManager namedClusterManager = new NamedClusterManager();
namedClusterManager.setBundleContext(bundleContext);
return namedClusterManager;
}
@Bean(value = "baseMessagesMessageGetterFactoryImpl")
@Scope("singleton")
public BaseMessagesMessageGetterFactoryImpl baseMessagesMessageGetterFactoryImpl() {
return new BaseMessagesMessageGetterFactoryImpl();
}
@Bean(value = "runtimeTestActionHandlers")
@Scope("singleton")
public RuntimeTestActionHandler runtimeTestActionHandlers() {
BaseMessagesMessageGetterFactoryImpl baseMessagesMessageGetterFactoryImpl = SpringUtil.getBean("baseMessagesMessageGetterFactoryImpl");
return new LoggingRuntimeTestActionHandlerImpl(baseMessagesMessageGetterFactoryImpl);
}
@Bean(value = "loggingRuntimeTestActionHandlerImpl")
@Scope("singleton")
public LoggingRuntimeTestActionHandlerImpl loggingRuntimeTestActionHandlerImpl() {
BaseMessagesMessageGetterFactoryImpl baseMessagesMessageGetterFactoryImpl = SpringUtil.getBean("baseMessagesMessageGetterFactoryImpl");
return new LoggingRuntimeTestActionHandlerImpl(baseMessagesMessageGetterFactoryImpl);
}
@Bean(value = "runtimeTestActionService")
@Scope("singleton")
public RuntimeTestActionService runtimeTestActionService() {
LoggingRuntimeTestActionHandlerImpl runtimeTestActionHandler = SpringUtil.getBean("runtimeTestActionHandlers");
List<RuntimeTestActionHandler> runtimeTestActionHandlers = new ArrayList<>();
runtimeTestActionHandlers.add(runtimeTestActionHandler);
LoggingRuntimeTestActionHandlerImpl loggingRuntimeTestActionHandlerImpl = SpringUtil.getBean("loggingRuntimeTestActionHandlerImpl");
return new RuntimeTestActionServiceImpl(runtimeTestActionHandlers, loggingRuntimeTestActionHandlerImpl);
}
@Bean(value = "runtimeTesterImpl")
@Scope("singleton")
public LoggingRuntimeTestActionHandlerImpl runtimeTesterImpl() {
BaseMessagesMessageGetterFactoryImpl baseMessagesMessageGetterFactoryImpl = SpringUtil.getBean("baseMessagesMessageGetterFactoryImpl");
return new LoggingRuntimeTestActionHandlerImpl(baseMessagesMessageGetterFactoryImpl);
}
@Bean(value = "runtimeTester")
@Scope("singleton")
public RuntimeTester runtimeTester() {
List<String> orderedModules = new ArrayList<>();
orderedModules.add("Hadoop Configuration");
orderedModules.add("Hadoop File System");
orderedModules.add("Map Reduce");
orderedModules.add("Oozie");
orderedModules.add("Zookeeper");
return new RuntimeTesterImpl(null, null, orderedModules.toString());
}
// 由于HadoopFileInputMeta、HadoopFileOutputMeta构造方法有参,而kettle源码实例化时是以无参的方式实例化,所以这里直接创建bean,需要用到的地方引用
@Bean(value = "hadoopFileInputMeta")
@Scope("prototype")
public HadoopFileInputMeta hadoopFileInputMeta() throws KettlePluginException {
NamedClusterService namedClusterService = SpringUtil.getBean("namedClusterService");
RuntimeTestActionService runtimeTestActionService = SpringUtil.getBean("runtimeTestActionService");
RuntimeTester runtimeTester = SpringUtil.getBean("runtimeTester");
return new HadoopFileInputMeta(namedClusterService, runtimeTestActionService, runtimeTester);
}
@Bean(value = "hadoopFileOutputMeta")
@Scope("prototype")
public HadoopFileOutputMeta hadoopFileOutputMeta() {
NamedClusterService namedClusterService = SpringUtil.getBean("namedClusterService");
RuntimeTestActionService runtimeTestActionService = SpringUtil.getBean("runtimeTestActionService");
RuntimeTester runtimeTester = SpringUtil.getBean("runtimeTester");
return new HadoopFileOutputMeta(namedClusterService, runtimeTestActionService, runtimeTester);
}
@Bean(value = "clusterInitializerProviders")
@Scope("singleton")
public ClusterInitializerProvider clusterInitializerProviders() {
return new ClusterInitializerProviderImpl(HadoopConfigurationBootstrap.getInstance());
}
@Bean(value = "clusterInitializer")
@Scope("singleton")
public ClusterInitializer clusterInitializer() {
ClusterInitializerProvider clusterInitializerProviders = SpringUtil.getBean("clusterInitializerProviders");
List<ClusterInitializerProvider> list = new ArrayList<>();
list.add(clusterInitializerProviders);
return new ClusterInitializerImpl(list);
}
@Bean(value = "hdfsFileNameParser")
@Scope("singleton")
public HDFSFileNameParser hdfsFileNameParser() {
return HDFSFileNameParser.getInstance();
}
@Bean(value = "hadoopFileSystemFactories")
@Scope("singleton")
public HadoopFileSystemFactory hadoopFileSystemFactories() throws ConfigurationException {
HadoopConfigurationBootstrap hadoopConfigurationBootstrap = HadoopConfigurationBootstrap.getInstance();
HadoopConfiguration activeConfiguration = hadoopConfigurationBootstrap.getProvider().getActiveConfiguration();
return new HadoopFileSystemFactoryImpl(true, activeConfiguration, null);
}
@Bean(value = "hadoopFileSystemService")
@Scope("singleton")
public HadoopFileSystemLocator hadoopFileSystemService() {
HadoopFileSystemFactory hadoopFileSystemFactory = SpringUtil.getBean("hadoopFileSystemFactories");
List<HadoopFileSystemFactory> hadoopFileSystemFactories = new ArrayList<>();
hadoopFileSystemFactories.add(hadoopFileSystemFactory);
ClusterInitializer clusterInitializer = SpringUtil.getBean("clusterInitializer");
return new HadoopFileSystemLocatorImpl(hadoopFileSystemFactories, clusterInitializer);
}
// 实例化后会在PluginType里创建hdfs类型,在根据schema获取hdfs文件处理器时会用到
@Bean(value = "hDFSFileProvider")
@Scope("singleton")
public HDFSFileProvider hDFSFileProvider() throws FileSystemException {
HadoopFileSystemLocator hadoopFileSystemService = SpringUtil.getBean("hadoopFileSystemService");
NamedClusterService namedClusterService = SpringUtil.getBean("namedClusterService");
HDFSFileNameParser hdfsFileNameParser = SpringUtil.getBean("hdfsFileNameParser");
return new HDFSFileProvider(hadoopFileSystemService, namedClusterService, hdfsFileNameParser, "hdfs");
}
复制代码
8、源码修改
(1)HadoopFileInputMeta.java
解决属性读取不到问题。方法loadSourceRep里修改如下:
(2) KettleDatabaseRepositoryStepDelegate.java
方法loadStepMeta里修改如下:
if ( sp == null ) {stepMeta.setStepMetaInterface( new MissingTrans( stepMeta.getName(), stepMeta.getStepID() ) );
} else {
// 解决kettle底层无参构造hadoopFileInputMeta,hadoopFileOutputMeta无namedClusterService实例导致报错问题
if ("HadoopFileInput".equals(stepMeta.getStepID())) {
HadoopFileInputMeta hadoopFileInputMeta = SpringUtil.getBean("hadoopFileInputMeta");
stepMeta.setStepMetaInterface(hadoopFileInputMeta);
} else if("HadoopFileOutput".equals(stepMeta.getStepID())) {
HadoopFileOutputMeta hadoopFileOutputMeta = SpringUtil.getBean("hadoopFileOutputMeta");
stepMeta.setStepMetaInterface(hadoopFileOutputMeta);
} else {
stepMeta.setStepMetaInterface( (StepMetaInterface) registry.loadClass( sp ) );
}
}
(3) PluginPropertiesUtil.java
解决打成jar包后读取hadoop-configuration下的文件报找不到路径错误。
getPath获取的是不带schema的路径:例如:file:/D:/etl/target/classes/
而toString会带上schema,例如:jar:file:/D:/test/DC-etl-web.jar!/BOOT-INF/classes/
错误的根本原因看下图:
(4) HDFSFileProvider.java
修改内容见下图:
(5) HadoopFileSystemFactoryImpl.java
这里是比较重要的修改。由于kettle客户端软件使用hadoop集群时,换一个集群是需要替换hadoop-configuration下对应的配置文件(例如core-site.xml, hdfs-site.xml) 并且重启客户端;而我们集成到SpringBoot项目时,不可能换一个集群就重启项目,如何不重启就能使用任意一个hadoop集群,无论是否kerberos鉴权。修改源码部分见下:
@SneakyThrows@Override
public HadoopFileSystem create( NamedCluster namedCluster, URI uri ) throws IOException {
final URI finalUri = uri != null ? uri : URI.create( "" );
final HadoopShim hadoopShim = hadoopConfiguration.getHadoopShim();
final Configuration configuration = hadoopShim.createConfiguration();
configuration.set("fs.defaultFS", uri.toString());
// 解决用户名鉴权和kerberos不能同时访问问题
configuration.set("ipc.client.fallback-to-simple-auth-allowed", "true");
FileSystem fileSystem = null;
// 在新增hadoop cluster时,选择core-site.xml,hdfs-site.xml、hdfs.keytab、krb5.conf等文件插入数据库中;在此处根据URL(例如:hdfs://localhost:9000/)去表里读取鉴权和配置信息
EtlElementAuthService rElementAuthService = SpringUtil.getBean(EtlElementAuthService.class);
QueryWrapper<EtlElementAuthEntity> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("url", uri.toString());
queryWrapper.eq("del_flag", "0");
EtlElementAuthEntity etlElementAuthEntity = rElementAuthService.getOne(queryWrapper);
if(ObjectUtil.isNull(etlElementAuthEntity)) {
throw new Exception("此hadoop cluster不存在鉴权信息");
}
if(EtlElementAuthEntity.KERBEROS.equals(etlElementAuthEntity.getKerberosAuth())) {
// kerberos鉴权
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
//base64文件转file,保存
String kerberosConf = etlElementAuthEntity.getKerberosConf();
String kerberosKeytab = etlElementAuthEntity.getKerberosKeytab();
String kerberosUser = etlElementAuthEntity.getKerberosUser();
String uuid = IdUtil.randomUUID();
String confName = base64ToFile(kerberosConf,"conf" + uuid);
String keytabName = base64ToFile(kerberosKeytab,"keytab" + uuid);
System.setProperty("java.security.krb5.conf", confName);
String coreSite = etlElementAuthEntity.getCoreSite();
ByteArrayInputStream coreSiteInputStream = new ByteArrayInputStream(base64ToByte(coreSite));
String hdfsSite = etlElementAuthEntity.getHdfsSite();
ByteArrayInputStream hdfsSiteInputStream = new ByteArrayInputStream(base64ToByte(hdfsSite));
conf.addResource(coreSiteInputStream);
conf.addResource(hdfsSiteInputStream);
conf.set("fs.defaultFS", uri.toString());
UserGroupInformation.setConfiguration(conf);
configuration.set("HADOOP_USER_NAME", "hadoop");
System.out.println("Security enabled " + UserGroupInformation.isSecurityEnabled());
try {
UserGroupInformation.loginUserFromKeytab(kerberosUser, keytabName);
} catch (IOException e) {
e.printStackTrace();
throw new KettleDatabaseException("kerbors认证失败",e);
}finally {
//验证完之后删除文件
File file=new File(confName);
file.delete();
File keytab=new File(keytabName);
keytab.delete();
}
fileSystem = (FileSystem) hadoopShim.getFileSystem( configuration ).getDelegate();
} else {
if(StrUtil.isNotBlank(namedCluster.getHdfsUsername())) {
fileSystem = (FileSystem) hadoopShim.getFileSystem(uri, configuration, namedCluster.getHdfsUsername() ).getDelegate();
} else {
fileSystem = (FileSystem) hadoopShim.getFileSystem( configuration ).getDelegate();
}
}
if ( fileSystem instanceof LocalFileSystem ) {
LOGGER.error( "Got a local filesystem, was expecting an hdfs connection" );
throw new IOException( "Got a local filesystem, was expecting an hdfs connection" );
}
return new HadoopFileSystemImpl(new HadoopFileSystemCallable() {
@Override
public FileSystem getFileSystem() {
try {
if( EtlElementAuthEntity.USERPASSWORD.equals(etlElementAuthEntity.getKerberosAuth()) && StrUtil.isNotBlank(namedCluster.getHdfsUsername())) {
return (FileSystem) hadoopShim.getFileSystem( finalUri, configuration, namedCluster.getHdfsUsername() ).getDelegate();
} else {
return (FileSystem) hadoopShim.getFileSystem( finalUri, configuration, null ).getDelegate();
}
} catch ( IOException e ) {
LOGGER.debug( "Error looking up/creating the file system ", e );
return null;
} catch ( InterruptedException e ) {
LOGGER.debug( "Error looking up/creating the file system ", e );
return null;
}
}
} );
}
@SneakyThrows
public static byte[] base64ToByte(String base64) {
File file = null;
//创建文件目录
int i = base64.indexOf(",");
if(i>0){
base64=base64.substring(i);
}
byte[] bytes = Base64.getMimeDecoder().decode(base64);
return bytes;
}
//BASE64解码成File文件
@SneakyThrows
public static String base64ToFile(String base64,String fileName) {
File file = null;
//创建文件目录
int i = base64.indexOf(",");
if(i>0){
base64=base64.substring(i);
}
//截取逗号前的文件名字
BufferedOutputStream bos = null;
java.io.FileOutputStream fos = null;
try {
byte[] bytes = Base64.getMimeDecoder().decode(base64);
file=new File(fileName);
if(file.exists()){
file.delete();
}
fos = new java.io.FileOutputStream(file);
bos = new BufferedOutputStream(fos);
bos.write(bytes);
file.createNewFile();
return fileName;
}finally {
if (bos != null) {
try {
bos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
复制代码CREATE TABLE `kettle_bigdata_auth` (
`id_element` bigint NOT NULL COMMENT 'PRIMARY KEY',
`url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'hadoop cluster url',
`kerberos_auth` tinyint DEFAULT '0' COMMENT '是否kerberos认证, 0:否 1:是',
`kerberos_conf` longblob COMMENT 'kerberos conf文件',
`kerberos_keytab` longblob COMMENT 'kerberos 秘钥文件',
`kerberos_user` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'kerberos认证用户名',
`principal` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'kerberos用户名',
`core_site` longblob COMMENT 'core-site.xml文件',
`hdfs_site` longblob COMMENT 'hdfs_site.xml文件',
`del_flag` char(1) COLLATE utf8mb4_general_ci DEFAULT '0' COMMENT '删除标记',
PRIMARY KEY (`id_element`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='hadoop cluster 鉴权相关表';
复制代码
错误解决
1、ClassNotFoundException: org.apache.commons.io.Charsets
pom.xml中将commons-io版本升级,我这里用的是2.5版本
2、No FileSystem for scheme: hdfs
加上这个bean,类实例化时会加上
@Bean(value = "hDFSFileProvider")@Scope("singleton")
public HDFSFileProvider hDFSFileProvider() throws FileSystemException {
HadoopFileSystemLocator hadoopFileSystemService = SpringUtil.getBean("hadoopFileSystemService");
NamedClusterService namedClusterService = SpringUtil.getBean("namedClusterService");
HDFSFileNameParser hdfsFileNameParser = SpringUtil.getBean("hdfsFileNameParser");
return new HDFSFileProvider(hadoopFileSystemService, namedClusterService, hdfsFileNameParser, "hdfs");
}
复制代码
3、Could not initialize class org.apache.hadoop.security.UserGroupInformation
缺少hadoop-common依赖,为了方便直接将如下依赖都加上:
<dependency><groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
复制代码
4、could not initialize class org.apache.hadoop.security.credentials
hadoop鉴权问题,前面内容已介绍
5、引入pentaho-hadoop-shims-hdp30-package依赖下载不了问题
只要排除如下依赖就可以了
<dependency><groupId>org.pentaho</groupId>
<artifactId>pentaho-hadoop-shims-hdp30-package</artifactId>
<version>8.2.2018.11.00-342</version>
<type>zip</type>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.netbeans</groupId>
<artifactId>mof</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-xml</artifactId>
</exclusion>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
复制代码
6、java.nio.file.FileSystemNotFoundException
见源码修改 -(3)
7、class org.apache.hive.service.rpc.thrift.TCLIService$Client has interface or...
隐约记得是依赖问题,可以按照上面的pomxml添加;
8、Got a local filesystem, was expecting an hdfs connection
原因是根据路径找到的不是hdfs schema; 而是file schema。 解决办法还是路径问题
9、SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS]
hadoop集群kerberos鉴权问题,参考上文的:源码修改 - (5)
10、No rules applied to hdfs/xxx@FAYSON.COM
core-site.xml文件里少属性
<property><name>hadoop.security.auth_to_local</name>
<value>RULE:[1:$1@$0](.*@\QFAYSON.COM\E$)s/@\QFAYSON.COM\E$//
RULE:[2:$1@$0](.*@\QFAYSON.COM\E$)s/@\QFAYSON.COM\E$//
DEFAULT</value>
</property>
复制代码
11、Server asks us to fall back to SIMPLE auth, but this client is configured to only allow secure connections
需要配置允许同时kerberos认证和用户名密码认证。
final Configuration configuration = hadoopShim.createConfiguration();configuration.set("fs.defaultFS", uri.toString());
// 解决用户名鉴权和kerberos不能同时访问问题
configuration.set("ipc.client.fallback-to-simple-auth-allowed", "true");
复制代码
注意
在需要用到HadoopFileInput、HadoopFileOutput的地方都需要手动赋值。例如:
PluginInterface sp = PluginRegistry.getInstance().getPlugin(StepPluginType.class, pluginId);if(sp != null) {
if(plugin instanceof HadoopFileOutput) {
StepMetaInterface stepMetaInterface = SpringUtil.getBean("hadoopFileOutputMeta");
clazz = stepMetaInterface.getClass();
} else {
StepMetaInterface stepMetaInterface = PluginRegistry.getInstance().loadClass(sp, StepMetaInterface.class);
clazz = stepMetaInterface.getClass();
}
} else {
sp = PluginRegistry.getInstance().getPlugin(JobEntryPluginType.class, pluginId);
JobEntryInterface jobEntryInterface = PluginRegistry.getInstance().loadClass(sp, JobEntryInterface.class);
clazz = jobEntryInterface.getClass();
}