这上面的代码是根据时间分段上传,解析.trs格式到json格式,并且发送bulk上传的代码在ToolClass文件(在底部)中。 package @@@@@@@@@@@@@@;import java.io.File;import java.io.IOException;import java.net.Ine
package @@@@@@@@@@@@@@;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.Scanner;
import java.util.concurrent.ExecutionException;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import com.trs.hybase.client.TRSConnection;
import com.trs.hybase.client.TRSException;
import com.trs.hybase.client.TRSExport;
import com.trs.hybase.client.TRSRecord;
import com.trs.hybase.client.TRSResultSet;
import com.trs.hybase.client.params.ConnectParams;
import com.trs.hybase.client.params.SearchParams;
public class getDataByTimestamp {
@SuppressWarnings("deprecation")
public static void main(String[] args)
throws TRSException, ParseException, IOException, InterruptedException, ExecutionException {
String dbName = null;
String readColumn = null;
String timeStamp = null;
// 控制台接收数据
System.out.println("1/4.input the dataBase name:");
@SuppressWarnings("resource")
Scanner temp = new Scanner(System.in);
dbName = temp.next();
System.out.println("2/4.input the min_interval:");
@SuppressWarnings("resource")
Scanner temp2 = new Scanner(System.in);
int timeIntervl = temp2.nextInt();
System.out.println("3/4.input the timeStamp:");
@SuppressWarnings("resource")
Scanner temp3 = new Scanner(System.in);
timeStamp = temp3.next();
System.out.println("4/4.input the columnName:");
@SuppressWarnings("resource")
Scanner temp4 = new Scanner(System.in);
readColumn = temp4.next();
// 设置变量
String HYBASEDBNAME = dbName;// "szb_original_20160321";
String ESDBNAME = dbName;// "szb_original_20160321";
String ESIP = "192.168.103.161";// 192.168.103.161
String clustName = "trs-yxl";// elasticsearch trs-yxl
SimpleDateFormat sft = new SimpleDateFormat("yyyyMMddHHmmss");
// 为每一个数据临时文件增加独一无二的时间戳,防止文件名称重复导致的空指针异常
Random random = new Random();
String trsDataFilePath = "dataByTimeStemp_" + random.nextFloat() + ".trs";
// 按照时间戳检索上传数据的时间间隔
int min_interval = timeIntervl;// 1000;
// ES的端口号
int ESPORT = 9300;
Date startTime = null;
Date endTime = null;
Date nowTime = new Date();
// END
// 建立HYBASE连接
TRSConnection conn = new TRSConnection("http://海贝ip和端口号", 用户名, 密码, new ConnectParams());
// 建立HYBASE数据导出
TRSExport exp = new TRSExport(conn, trsDataFilePath);
// 获取最早记录时间
SearchParams param = new SearchParams();
SearchParams paramInWhile = new SearchParams();
param.setReadColumns(readColumn);
param.setSortMethod("+" + readColumn);
// 若时间字段没有时间内容则通过检索条件过滤
TRSResultSet resultSet = conn.executeSelect(HYBASEDBNAME, readColumn + ":[20000101000000 TO *" + "]", 0, 1,
param);
for (int i = 0; i < resultSet.size(); i++) {
resultSet.moveNext();
TRSRecord re = resultSet.get();
SimpleDateFormat temp_sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
startTime = temp_sdf.parse(re.getString(readColumn));
endTime = temp_sdf.parse(re.getString(readColumn));
}
// 如果控制没有输入pass,则输入内容被当作制定指定时间戳
if (!timeStamp.equals("pass")) {
startTime = sft.parse(timeStamp);
endTime = sft.parse(timeStamp);
}
// 按照时间循环读数据,与startTime形成区间
endTime.setMinutes(endTime.getMinutes() + min_interval);
// 声明工具类对象
ToolClass toolclass = new ToolClass();
// 设置ES参数
Settings settings = Settings.builder().put("cluster.name", clustName).build();
@SuppressWarnings("resource")
TransportClient client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ESIP), ESPORT));
// 开始循环读写数据
long totalTime = 0;
while (true) {
long startTime2 = System.nanoTime();// 计时开始
// 当超出当前最新时间的时候退出循环
if (Long.parseLong(sft.format(startTime).toString()) >= Long.parseLong(sft.format(nowTime).toString())) {
break;
}
// 下载HYBASE
long downloadTime = System.nanoTime();
exp.export(HYBASEDBNAME, readColumn + ":[" + sft.format(startTime).toString() + " TO "
+ sft.format(endTime).toString() + "]", 0, 10000, paramInWhile);
long downloadTimeEnd = System.nanoTime();
// 上传ES
toolclass.sentToES(client, trsDataFilePath, ESIP, ESPORT, ESDBNAME, "type", sft.format(startTime).toString());
// 删除当前trs文件,避免下次无数据的时候重复上传
deleteFile(trsDataFilePath);
long endTime2 = System.nanoTime();// 计时
totalTime = totalTime + (endTime2 - startTime2);
// 控制台输出
System.out.println("[" + sft.format(startTime).toString() + " TO " + sft.format(endTime).toString() + "]"
+ " hybaseTime:" + (downloadTimeEnd - downloadTime) / 1000000 + " time: "
+ (endTime2 - startTime2) / 1000000 + "ms," + " wholeTtime:" + totalTime / 1000000 + "ms");
System.out.println("");
// 单词循环结束的时候累加时间
startTime.setMinutes(startTime.getMinutes() + min_interval);
startTime.setSeconds(startTime.getSeconds() + 1);
endTime.setMinutes(endTime.getMinutes() + min_interval);
endTime.setSeconds(endTime.getSeconds() + 1);
}
// 关闭conn
conn.close();
// END
}
public static boolean deleteFile(String fileName) {
File file = new File(fileName);
// 如果文件路径所对应的文件存在,并且是一个文件,则直接删除
if (file.exists() && file.isFile()) {
if (file.delete()) {
return true;
} else {
return false;
}
} else {
return false;
}
}
}
--------------------------------------------------------我是漂亮的分割线,下面是ToolClass文件-------------------------------------------------------------------
package @@@@@@@@@@@@@@@@@@@@@@@@
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.ExecutionException;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import com.trs.hybase.client.TRSException;
public class ToolClass {
public void sentToES(TransportClient client, String inputFilePath, String ESIP, int ESPort, String index,
String type, String inputTime) throws IOException, InterruptedException, ExecutionException, TRSException {
//
int currentRECCount = 0;
// 批量提交对象建立
BulkRequestBuilder bulkRequest = client.prepareBulk();
//
long startTime = System.nanoTime();// 计时开始
// 读取hybase导出的文件
File file = new File(inputFilePath);
if (file.isFile() && file.exists()) { // 判断文件是否存在
InputStreamReader read = new InputStreamReader(new FileInputStream(file), "gbk");
BufferedReader bufferedReader = new BufferedReader(read);
XContentBuilder aRecordBuilder = null;
String lineString = null;
String field = "";
StringBuilder content = new StringBuilder();
boolean contentHasContent = false;
// // 设置id
// String keiid = "IR_HKEY";
// String tempid = null;
while ((lineString = bufferedReader.readLine()) != null) {
// System.out.println(lineString);
//
// // 设置id
// if (lineString.matches("^<" + keiid + ">=.*$") == true) {
// tempid = lineString
// .replace(lineString.split(">=")[0].substring(0,
// lineString.split(">=")[0].length()), "")
// .substring(2);
// }
if (lineString.equals("
")) { // 当内容变量不为空的时候,发送之前字段的数据@@ if (contentHasContent == true) { aRecordBuilder.field(field, content.toString()); // 清空content内容 content.setLength(0); contentHasContent = false; } currentRECCount++; // 封闭aRecordBuilder if (aRecordBuilder != null) { aRecordBuilder.endObject(); bulkRequest.add(client.prepareIndex(index, type).setSource(aRecordBuilder)); aRecordBuilder = null; } aRecordBuilder = jsonBuilder().startObject(); continue; } // 当遇到标签的时候 if (lineString.matches("^<.*>=.*$") == true) { // 当内容变量不为空的时候,发送之前的数据@@ if (contentHasContent == true) { aRecordBuilder.field(field, content.toString()); // 清空content内容 content.setLength(0); contentHasContent = false; } field = lineString.split(">=")[0].substring(1, lineString.split(">=")[0].length()); // 将标签等于号后面的所有内容全部添加进content content.append(lineString .replace(lineString.split(">=")[0].substring(0, lineString.split(">=")[0].length()), "") .substring(2)); contentHasContent = true; continue; } // 当lineString为无标签内容行 if (lineString.matches("^<.*>=.*$") == false) { contentHasContent = true; content.append(lineString); continue; } } // 最后一次提交字段 if (contentHasContent == true) { aRecordBuilder.field(field, content.toString()); // 清空content内容 content.setLength(0); contentHasContent = false; } // 最后一次闭合aRecordBuilder aRecordBuilder.endObject(); currentRECCount++; bulkRequest.add(client.prepareIndex(index, type).setSource(aRecordBuilder)); aRecordBuilder = null; // , inputTime + "-" + RECCount currentRECCount--; // 上传ES BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) { System.out.println(inputTime + "submit error!!!"); System.out.println(bulkResponse.buildFailureMessage()); } // 关闭提交 read.close(); bufferedReader.close(); // 检测 long endTime = System.nanoTime(); // 获取结束时间 System.out.println("toESTime:" + (endTime - startTime) / 1000000 + "ms" + "-----currentRecordCount:[" + currentRECCount + "]-----timeStamp:" + inputTime); } } }
