SinkMysql package com.slfoun.flume.sink;import com.google.common.base.Preconditions;import com.google.common.base.Throwables;import com.google.common.collect.Lists;import org.apache.flume.*;import org.apache.flume.conf.Configurable;import o
package com.slfoun.flume.sink; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.List; /* */ public class MysqlSink extends AbstractSink implements Configurable { private Logger LOG = LoggerFactory.getLogger(MysqlSink.class); private String hostname; private String port; private String databaseName; private String tableName; private String user; private String password; private PreparedStatement preparedStatement; private Connection conn; private int batchSize; public MysqlSink() { LOG.info("MysqlSink start..."); } @Override public Status process() throws EventDeliveryException { Status result = Status.READY; Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); Event event; String content; Listinfos = Lists.newArrayList(); transaction.begin(); try { for (int i = 0; i < batchSize; i++) { event = channel.take(); if (event != null) {//对事件进行处理 //event 的 body 为 "exec tail$i , abel" content = new String(event.getBody()); Info info = new Info(); if (content.contains(",")) { //存储 event 的 content info.setContent(content.substring(0, content.indexOf(","))); //存储 event 的 create +1 是要减去那个 "," info.setCreateBy(content.substring(content.indexOf(",") + 1)); } else { info.setContent(content); } infos.add(info); } else { result = Status.BACKOFF; break; } } if (infos.size() > 0) { preparedStatement.clearBatch(); for (Info temp : infos) { preparedStatement.setString(1, temp.getContent()); preparedStatement.setString(2, temp.getCreateBy()); preparedStatement.addBatch(); } preparedStatement.executeBatch(); conn.commit(); } transaction.commit(); } catch (Exception e) { try { transaction.rollback(); } catch (Exception e2) { LOG.error("Exception in rollback. Rollback might not have been" + "successful.", e2); } LOG.error("Failed to commit transaction." + "Transaction rolled back.", e); Throwables.propagate(e); } finally { transaction.close(); } return result; } @Override public void configure(Context context) { hostname = context.getString("hostname"); Preconditions.checkNotNull(hostname, "hostname must be set!!"); port = context.getString("port"); Preconditions.checkNotNull(port, "port must be set!!"); databaseName = context.getString("databaseName"); Preconditions.checkNotNull(databaseName, "databaseName must be set!!"); tableName = context.getString("tableName"); Preconditions.checkNotNull(tableName, "tableName must be set!!"); user = context.getString("user"); Preconditions.checkNotNull(user, "user must be set!!"); password = context.getString("password"); Preconditions.checkNotNull(password, "password must be set!!"); batchSize = context.getInteger("batchSize", 100); Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!"); } @Override public void start() { super.start(); try { Class.forName("com.mysql.jdbc.Driver"); } catch (ClassNotFoundException e) { e.printStackTrace(); } String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databaseName; //调用DriverManager对象的getConnection()方法,获得一个Connection对象 try { conn = DriverManager.getConnection(url, user, password); } catch (SQLException e) { e.printStackTrace(); System.exit(1); } } @Override public void stop() { super.stop(); if (preparedStatement != null) { try { preparedStatement.close(); } catch (SQLException e) { e.printStackTrace(); } } if (conn != null) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } }