这上面的代码是根据时间分段上传,解析.trs格式到json格式,并且发送bulk上传的代码在ToolClass文件(在底部)中。 package @@@@@@@@@@@@@@;import java.io.File;import java.io.IOException;import java.net.Ine
package @@@@@@@@@@@@@@; import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Random; import java.util.Scanner; import java.util.concurrent.ExecutionException; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.transport.client.PreBuiltTransportClient; import com.trs.hybase.client.TRSConnection; import com.trs.hybase.client.TRSException; import com.trs.hybase.client.TRSExport; import com.trs.hybase.client.TRSRecord; import com.trs.hybase.client.TRSResultSet; import com.trs.hybase.client.params.ConnectParams; import com.trs.hybase.client.params.SearchParams; public class getDataByTimestamp { @SuppressWarnings("deprecation") public static void main(String[] args) throws TRSException, ParseException, IOException, InterruptedException, ExecutionException { String dbName = null; String readColumn = null; String timeStamp = null; // 控制台接收数据 System.out.println("1/4.input the dataBase name:"); @SuppressWarnings("resource") Scanner temp = new Scanner(System.in); dbName = temp.next(); System.out.println("2/4.input the min_interval:"); @SuppressWarnings("resource") Scanner temp2 = new Scanner(System.in); int timeIntervl = temp2.nextInt(); System.out.println("3/4.input the timeStamp:"); @SuppressWarnings("resource") Scanner temp3 = new Scanner(System.in); timeStamp = temp3.next(); System.out.println("4/4.input the columnName:"); @SuppressWarnings("resource") Scanner temp4 = new Scanner(System.in); readColumn = temp4.next(); // 设置变量 String HYBASEDBNAME = dbName;// "szb_original_20160321"; String ESDBNAME = dbName;// "szb_original_20160321"; String ESIP = "192.168.103.161";// 192.168.103.161 String clustName = "trs-yxl";// elasticsearch trs-yxl SimpleDateFormat sft = new SimpleDateFormat("yyyyMMddHHmmss"); // 为每一个数据临时文件增加独一无二的时间戳,防止文件名称重复导致的空指针异常 Random random = new Random(); String trsDataFilePath = "dataByTimeStemp_" + random.nextFloat() + ".trs"; // 按照时间戳检索上传数据的时间间隔 int min_interval = timeIntervl;// 1000; // ES的端口号 int ESPORT = 9300; Date startTime = null; Date endTime = null; Date nowTime = new Date(); // END // 建立HYBASE连接 TRSConnection conn = new TRSConnection("http://海贝ip和端口号", 用户名, 密码, new ConnectParams()); // 建立HYBASE数据导出 TRSExport exp = new TRSExport(conn, trsDataFilePath); // 获取最早记录时间 SearchParams param = new SearchParams(); SearchParams paramInWhile = new SearchParams(); param.setReadColumns(readColumn); param.setSortMethod("+" + readColumn); // 若时间字段没有时间内容则通过检索条件过滤 TRSResultSet resultSet = conn.executeSelect(HYBASEDBNAME, readColumn + ":[20000101000000 TO *" + "]", 0, 1, param); for (int i = 0; i < resultSet.size(); i++) { resultSet.moveNext(); TRSRecord re = resultSet.get(); SimpleDateFormat temp_sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); startTime = temp_sdf.parse(re.getString(readColumn)); endTime = temp_sdf.parse(re.getString(readColumn)); } // 如果控制没有输入pass,则输入内容被当作制定指定时间戳 if (!timeStamp.equals("pass")) { startTime = sft.parse(timeStamp); endTime = sft.parse(timeStamp); } // 按照时间循环读数据,与startTime形成区间 endTime.setMinutes(endTime.getMinutes() + min_interval); // 声明工具类对象 ToolClass toolclass = new ToolClass(); // 设置ES参数 Settings settings = Settings.builder().put("cluster.name", clustName).build(); @SuppressWarnings("resource") TransportClient client = new PreBuiltTransportClient(settings) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ESIP), ESPORT)); // 开始循环读写数据 long totalTime = 0; while (true) { long startTime2 = System.nanoTime();// 计时开始 // 当超出当前最新时间的时候退出循环 if (Long.parseLong(sft.format(startTime).toString()) >= Long.parseLong(sft.format(nowTime).toString())) { break; } // 下载HYBASE long downloadTime = System.nanoTime(); exp.export(HYBASEDBNAME, readColumn + ":[" + sft.format(startTime).toString() + " TO " + sft.format(endTime).toString() + "]", 0, 10000, paramInWhile); long downloadTimeEnd = System.nanoTime(); // 上传ES toolclass.sentToES(client, trsDataFilePath, ESIP, ESPORT, ESDBNAME, "type", sft.format(startTime).toString()); // 删除当前trs文件,避免下次无数据的时候重复上传 deleteFile(trsDataFilePath); long endTime2 = System.nanoTime();// 计时 totalTime = totalTime + (endTime2 - startTime2); // 控制台输出 System.out.println("[" + sft.format(startTime).toString() + " TO " + sft.format(endTime).toString() + "]" + " hybaseTime:" + (downloadTimeEnd - downloadTime) / 1000000 + " time: " + (endTime2 - startTime2) / 1000000 + "ms," + " wholeTtime:" + totalTime / 1000000 + "ms"); System.out.println(""); // 单词循环结束的时候累加时间 startTime.setMinutes(startTime.getMinutes() + min_interval); startTime.setSeconds(startTime.getSeconds() + 1); endTime.setMinutes(endTime.getMinutes() + min_interval); endTime.setSeconds(endTime.getSeconds() + 1); } // 关闭conn conn.close(); // END } public static boolean deleteFile(String fileName) { File file = new File(fileName); // 如果文件路径所对应的文件存在,并且是一个文件,则直接删除 if (file.exists() && file.isFile()) { if (file.delete()) { return true; } else { return false; } } else { return false; } } } --------------------------------------------------------我是漂亮的分割线,下面是ToolClass文件------------------------------------------------------------------- package @@@@@@@@@@@@@@@@@@@@@@@@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.concurrent.ExecutionException; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.xcontent.XContentBuilder; import com.trs.hybase.client.TRSException; public class ToolClass { public void sentToES(TransportClient client, String inputFilePath, String ESIP, int ESPort, String index, String type, String inputTime) throws IOException, InterruptedException, ExecutionException, TRSException { // int currentRECCount = 0; // 批量提交对象建立 BulkRequestBuilder bulkRequest = client.prepareBulk(); // long startTime = System.nanoTime();// 计时开始 // 读取hybase导出的文件 File file = new File(inputFilePath); if (file.isFile() && file.exists()) { // 判断文件是否存在 InputStreamReader read = new InputStreamReader(new FileInputStream(file), "gbk"); BufferedReader bufferedReader = new BufferedReader(read); XContentBuilder aRecordBuilder = null; String lineString = null; String field = ""; StringBuilder content = new StringBuilder(); boolean contentHasContent = false; // // 设置id // String keiid = "IR_HKEY"; // String tempid = null; while ((lineString = bufferedReader.readLine()) != null) { // System.out.println(lineString); //// // 设置id // if (lineString.matches("^<" + keiid + ">=.*$") == true) { // tempid = lineString // .replace(lineString.split(">=")[0].substring(0, // lineString.split(">=")[0].length()), "") // .substring(2); // } if (lineString.equals(" ")) { // 当内容变量不为空的时候,发送之前字段的数据@@ if (contentHasContent == true) { aRecordBuilder.field(field, content.toString()); // 清空content内容 content.setLength(0); contentHasContent = false; } currentRECCount++; // 封闭aRecordBuilder if (aRecordBuilder != null) { aRecordBuilder.endObject(); bulkRequest.add(client.prepareIndex(index, type).setSource(aRecordBuilder)); aRecordBuilder = null; } aRecordBuilder = jsonBuilder().startObject(); continue; } // 当遇到标签的时候 if (lineString.matches("^<.*>=.*$") == true) { // 当内容变量不为空的时候,发送之前的数据@@ if (contentHasContent == true) { aRecordBuilder.field(field, content.toString()); // 清空content内容 content.setLength(0); contentHasContent = false; } field = lineString.split(">=")[0].substring(1, lineString.split(">=")[0].length()); // 将标签等于号后面的所有内容全部添加进content content.append(lineString .replace(lineString.split(">=")[0].substring(0, lineString.split(">=")[0].length()), "") .substring(2)); contentHasContent = true; continue; } // 当lineString为无标签内容行 if (lineString.matches("^<.*>=.*$") == false) { contentHasContent = true; content.append(lineString); continue; } } // 最后一次提交字段 if (contentHasContent == true) { aRecordBuilder.field(field, content.toString()); // 清空content内容 content.setLength(0); contentHasContent = false; } // 最后一次闭合aRecordBuilder aRecordBuilder.endObject(); currentRECCount++; bulkRequest.add(client.prepareIndex(index, type).setSource(aRecordBuilder)); aRecordBuilder = null; // , inputTime + "-" + RECCount currentRECCount--; // 上传ES BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) { System.out.println(inputTime + "submit error!!!"); System.out.println(bulkResponse.buildFailureMessage()); } // 关闭提交 read.close(); bufferedReader.close(); // 检测 long endTime = System.nanoTime(); // 获取结束时间 System.out.println("toESTime:" + (endTime - startTime) / 1000000 + "ms" + "-----currentRecordCount:[" + currentRECCount + "]-----timeStamp:" + inputTime); } } }