当前位置 : 主页 > 编程语言 > java >

HYBASE同步数据到ES(HYBASE输出格式默认为.trs)

来源:互联网 收集:自由互联 发布时间:2021-06-28
这上面的代码是根据时间分段上传,解析.trs格式到json格式,并且发送bulk上传的代码在ToolClass文件(在底部)中。 package @@@@@@@@@@@@@@;import java.io.File;import java.io.IOException;import java.net.Ine
这上面的代码是根据时间分段上传,解析.trs格式到json格式,并且发送bulk上传的代码在ToolClass文件(在底部)中。
package @@@@@@@@@@@@@@;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.Scanner;
import java.util.concurrent.ExecutionException;

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import com.trs.hybase.client.TRSConnection;
import com.trs.hybase.client.TRSException;
import com.trs.hybase.client.TRSExport;
import com.trs.hybase.client.TRSRecord;
import com.trs.hybase.client.TRSResultSet;
import com.trs.hybase.client.params.ConnectParams;
import com.trs.hybase.client.params.SearchParams;

public class getDataByTimestamp {
	@SuppressWarnings("deprecation")
	public static void main(String[] args)
			throws TRSException, ParseException, IOException, InterruptedException, ExecutionException {

		String dbName = null; 
		String readColumn = null;
		String timeStamp = null;

		// 控制台接收数据
		System.out.println("1/4.input the dataBase name:");
		@SuppressWarnings("resource")
		Scanner temp = new Scanner(System.in);
		dbName = temp.next();
		System.out.println("2/4.input the min_interval:");
		@SuppressWarnings("resource")
		Scanner temp2 = new Scanner(System.in);
		int timeIntervl = temp2.nextInt();
		System.out.println("3/4.input the timeStamp:");
		@SuppressWarnings("resource")
		Scanner temp3 = new Scanner(System.in);
		timeStamp = temp3.next();
		System.out.println("4/4.input the columnName:");
		@SuppressWarnings("resource")
		Scanner temp4 = new Scanner(System.in);
		readColumn = temp4.next();

		// 设置变量
		String HYBASEDBNAME = dbName;// "szb_original_20160321";
		String ESDBNAME = dbName;// "szb_original_20160321";
		String ESIP = "192.168.103.161";// 192.168.103.161
		String clustName = "trs-yxl";// elasticsearch trs-yxl
		SimpleDateFormat sft = new SimpleDateFormat("yyyyMMddHHmmss");

		// 为每一个数据临时文件增加独一无二的时间戳,防止文件名称重复导致的空指针异常
		Random random = new Random();
		String trsDataFilePath = "dataByTimeStemp_" + random.nextFloat() + ".trs";

		// 按照时间戳检索上传数据的时间间隔
		int min_interval = timeIntervl;// 1000;

		// ES的端口号
		int ESPORT = 9300;

		Date startTime = null;
		Date endTime = null;
		Date nowTime = new Date();
		// END

		// 建立HYBASE连接
		TRSConnection conn = new TRSConnection("http://海贝ip和端口号", 用户名, 密码, new ConnectParams());
		// 建立HYBASE数据导出
		TRSExport exp = new TRSExport(conn, trsDataFilePath);

		// 获取最早记录时间
		SearchParams param = new SearchParams();
		SearchParams paramInWhile = new SearchParams();
		param.setReadColumns(readColumn);
		param.setSortMethod("+" + readColumn);
		// 若时间字段没有时间内容则通过检索条件过滤
		TRSResultSet resultSet = conn.executeSelect(HYBASEDBNAME, readColumn + ":[20000101000000 TO *" + "]", 0, 1,
				param);
		for (int i = 0; i < resultSet.size(); i++) {
			resultSet.moveNext();
			TRSRecord re = resultSet.get();
			SimpleDateFormat temp_sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
			startTime = temp_sdf.parse(re.getString(readColumn));
			endTime = temp_sdf.parse(re.getString(readColumn));
		}

		// 如果控制没有输入pass,则输入内容被当作制定指定时间戳
		if (!timeStamp.equals("pass")) {
			startTime = sft.parse(timeStamp);
			endTime = sft.parse(timeStamp);
		}

		// 按照时间循环读数据,与startTime形成区间
		endTime.setMinutes(endTime.getMinutes() + min_interval);

		// 声明工具类对象
		ToolClass toolclass = new ToolClass();

		// 设置ES参数
		Settings settings = Settings.builder().put("cluster.name", clustName).build();
		@SuppressWarnings("resource")
		TransportClient client = new PreBuiltTransportClient(settings)
				.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ESIP), ESPORT));

		// 开始循环读写数据
		long totalTime = 0;
		while (true) {
			long startTime2 = System.nanoTime();// 计时开始
			// 当超出当前最新时间的时候退出循环
			if (Long.parseLong(sft.format(startTime).toString()) >= Long.parseLong(sft.format(nowTime).toString())) {
				break;
			}

			// 下载HYBASE
			long downloadTime = System.nanoTime();
			exp.export(HYBASEDBNAME, readColumn + ":[" + sft.format(startTime).toString() + " TO "
					+ sft.format(endTime).toString() + "]", 0, 10000, paramInWhile);
			long downloadTimeEnd = System.nanoTime();

			// 上传ES
			toolclass.sentToES(client, trsDataFilePath, ESIP, ESPORT, ESDBNAME, "type", sft.format(startTime).toString());

			// 删除当前trs文件,避免下次无数据的时候重复上传
			deleteFile(trsDataFilePath);

			long endTime2 = System.nanoTime();// 计时
			totalTime = totalTime + (endTime2 - startTime2);

			// 控制台输出
			System.out.println("[" + sft.format(startTime).toString() + " TO " + sft.format(endTime).toString() + "]"
					+ "   hybaseTime:" + (downloadTimeEnd - downloadTime) / 1000000 + "   time: "
					+ (endTime2 - startTime2) / 1000000 + "ms," + "   wholeTtime:" + totalTime / 1000000 + "ms");
			System.out.println("");

			// 单词循环结束的时候累加时间
			startTime.setMinutes(startTime.getMinutes() + min_interval);
			startTime.setSeconds(startTime.getSeconds() + 1);
			endTime.setMinutes(endTime.getMinutes() + min_interval);
			endTime.setSeconds(endTime.getSeconds() + 1);
		}
		// 关闭conn
		conn.close();

		// END
	}

	public static boolean deleteFile(String fileName) {
		File file = new File(fileName);
		// 如果文件路径所对应的文件存在,并且是一个文件,则直接删除
		if (file.exists() && file.isFile()) {
			if (file.delete()) {
				return true;
			} else {
				return false;
			}
		} else {
			return false;
		}
	}
}

--------------------------------------------------------我是漂亮的分割线,下面是ToolClass文件-------------------------------------------------------------------

package @@@@@@@@@@@@@@@@@@@@@@@@

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.ExecutionException;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentBuilder;

import com.trs.hybase.client.TRSException;

public class ToolClass {

	public void sentToES(TransportClient client, String inputFilePath, String ESIP, int ESPort, String index,
			String type, String inputTime) throws IOException, InterruptedException, ExecutionException, TRSException {
		//
		int currentRECCount = 0;
		// 批量提交对象建立
		BulkRequestBuilder bulkRequest = client.prepareBulk();
		//
		long startTime = System.nanoTime();// 计时开始

		// 读取hybase导出的文件
		File file = new File(inputFilePath);
		if (file.isFile() && file.exists()) { // 判断文件是否存在

			InputStreamReader read = new InputStreamReader(new FileInputStream(file), "gbk");
			BufferedReader bufferedReader = new BufferedReader(read);
			XContentBuilder aRecordBuilder = null;
			String lineString = null;
			String field = "";
			StringBuilder content = new StringBuilder();
			boolean contentHasContent = false;

			// // 设置id
			// String keiid = "IR_HKEY";
			// String tempid = null;

			while ((lineString = bufferedReader.readLine()) != null) {
				// System.out.println(lineString);
				// 
 
  

				// // 设置id
				// if (lineString.matches("^<" + keiid + ">=.*$") == true) {
				// tempid = lineString
				// .replace(lineString.split(">=")[0].substring(0,
				// lineString.split(">=")[0].length()), "")
				// .substring(2);
				// }

				if (lineString.equals("
  
   ")) { // 当内容变量不为空的时候,发送之前字段的数据@@ if (contentHasContent == true) { aRecordBuilder.field(field, content.toString()); // 清空content内容 content.setLength(0); contentHasContent = false; } currentRECCount++; // 封闭aRecordBuilder if (aRecordBuilder != null) { aRecordBuilder.endObject(); bulkRequest.add(client.prepareIndex(index, type).setSource(aRecordBuilder)); aRecordBuilder = null; } aRecordBuilder = jsonBuilder().startObject(); continue; } // 当遇到标签的时候 if (lineString.matches("^<.*>=.*$") == true) { // 当内容变量不为空的时候,发送之前的数据@@ if (contentHasContent == true) { aRecordBuilder.field(field, content.toString()); // 清空content内容 content.setLength(0); contentHasContent = false; } field = lineString.split(">=")[0].substring(1, lineString.split(">=")[0].length()); // 将标签等于号后面的所有内容全部添加进content content.append(lineString .replace(lineString.split(">=")[0].substring(0, lineString.split(">=")[0].length()), "") .substring(2)); contentHasContent = true; continue; } // 当lineString为无标签内容行 if (lineString.matches("^<.*>=.*$") == false) { contentHasContent = true; content.append(lineString); continue; } } // 最后一次提交字段 if (contentHasContent == true) { aRecordBuilder.field(field, content.toString()); // 清空content内容 content.setLength(0); contentHasContent = false; } // 最后一次闭合aRecordBuilder aRecordBuilder.endObject(); currentRECCount++; bulkRequest.add(client.prepareIndex(index, type).setSource(aRecordBuilder)); aRecordBuilder = null; // , inputTime + "-" + RECCount currentRECCount--; // 上传ES BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) { System.out.println(inputTime + "submit error!!!"); System.out.println(bulkResponse.buildFailureMessage()); } // 关闭提交 read.close(); bufferedReader.close(); // 检测 long endTime = System.nanoTime(); // 获取结束时间 System.out.println("toESTime:" + (endTime - startTime) / 1000000 + "ms" + "-----currentRecordCount:[" + currentRECCount + "]-----timeStamp:" + inputTime); } } }
  
 
网友评论