当前位置 : 主页 > 编程语言 > c++ >

使用phoenix&jdbcTemplete 操作 hbase (生产示例)

来源:互联网 收集:自由互联 发布时间:2021-07-03
接口 package com.mimidai.common.dao.hbase;import java.util.List;import com.mimidai.common.entity.user.UserPhoneContacts;import com.mimidai.common.utils.persistence.Page;/** * 用户通讯录接口 core0 * cms query count delete * * @autho
接口
package com.mimidai.common.dao.hbase;

import java.util.List;

import com.mimidai.common.entity.user.UserPhoneContacts;
import com.mimidai.common.utils.persistence.Page;

/**
 * 用户通讯录接口 core0
 * cms query count delete
 * 
 * @author EVE
 *
 */

public interface HBaseUserPhoneContactsDao {

    /**
     * 
     * @param userPhoneContacts
     *            用户通讯录实例
     * @param disableUserId
     *            是否排除这个userId的
     * @param sort
     *            是否需要倒序排列
     * @return
     */
    List
 
   query(UserPhoneContacts userPhoneContacts, boolean disableUserId,
            boolean sort);

    Page
  
    query(UserPhoneContacts userPhoneContacts, Page
   
     page); /** * * @param userPhoneContacts * 用户通讯记录表 * @param whereSql * 拼接的where 条件之后的语句 * @return */ List
    
      query(String whereSql); void deleteById(UserPhoneContacts userPhoneContacts); void add(List
     
       userPhoneContactsList); Long count(UserPhoneContacts userPhoneContacts); /** * 根据ID修改 * @param userPhoneContactsList */ void updateById(List
      
        userPhoneContactsList); }
      
     
    
   
  
 
接口实现类
package com.mimidai.common.utils.hbase;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;

import com.mimidai.common.dao.hbase.HBaseUserPhoneContactsDao;
import com.mimidai.common.entity.user.UserPhoneContacts;
import com.mimidai.common.utils.persistence.Page;

@Repository
public class HBaseUserPhoneContactsDaoImpl implements HBaseUserPhoneContactsDao {

    private static Logger logger = LoggerFactory
            .getLogger("log.hbase.HBaseUserPhoneContactsDaoImpl");

    @Autowired
    @Qualifier("phoenixJdbcTemplate")
    private JdbcTemplate jdbcTemplate;
    
    private SimpleDateFormat df = new SimpleDateFormat("MMM d, yyyy h:mm:ss aaa",Locale.ENGLISH);

    @Override
    public List
 
   query(UserPhoneContacts userPhoneContacts, boolean disableUserId,
            boolean sort) {
        logger.warn("【HBase】进入查询(core0)方法");
        if (userPhoneContacts == null) {
            logger.error("【HBase】(core0)入参userPhoneContacts为NULL");
            return null;
        }
        StringBuffer sb = new StringBuffer(
                " select id,userId,imei,phone,name,callTimes,lastCall as lastCallTwo from phoneContacts where 1=1 ");
        if (userPhoneContacts.getUserId() != null) {
            if (!disableUserId) {
                sb.append(" and userId =  ").append(userPhoneContacts.getUserId());
                if (StringUtils.isNotBlank(userPhoneContacts.getPhone())) {
                    String phone = "'" + userPhoneContacts.getPhone() + "'";
                    sb.append(" and phone =  ").append(phone);
                }
                if (StringUtils.isNotBlank(userPhoneContacts.getImei())) {
                    String imei = "'" + userPhoneContacts.getImei() + "'";
                    sb.append(" and imei = ").append(imei);
                }
            } else {
                //为了匹配索引 pu
                if (StringUtils.isNotBlank(userPhoneContacts.getPhone())) {
                    String phone = "'" + userPhoneContacts.getPhone() + "'";
                    sb.append(" and phone =  ").append(phone);
                }
                sb.append(" and userId !=  ").append(userPhoneContacts.getUserId());
                if (StringUtils.isNotBlank(userPhoneContacts.getImei())) {
                    String imei = "'" + userPhoneContacts.getImei() + "'";
                    sb.append(" and imei = ").append(imei);
                }
            }
        }else{
        	return null;
        }

        if (sort)
            sb.append(" order by callTimes desc");
        sb.append("  limit 80000  ");
        String sql = sb.toString();
        logger.warn("【HBase】进入查询(core0)方法,sql={}", sql);
        List
  
    userPhoneContactsList = null; try { long startTime = System.currentTimeMillis(); userPhoneContactsList = jdbcTemplate.query(sql, new BeanPropertyRowMapper
   
    (UserPhoneContacts.class)); long endTime = System.currentTimeMillis(); float excTime = (float) (endTime - startTime) / 1000; logger.info("【HBase】执行查询(core0)成功,sql:" + sql + ",耗时:" + excTime); return userPhoneContactsList; }catch (Exception e) { logger.error("【HBase】执行自带条件查询(core0)失败,sql:" + sql + ",失败原因:" + e.toString(), e); e.printStackTrace(); } return null; } @Override /** * ScheduledTasks->newYearOverdue30to60Message 定时任务 注释掉 暂不实现 */ public Page
    
      query(UserPhoneContacts userPhoneContacts, Page
     
       page) { logger.warn("【HBase】进入分页查询(core0)方法"); if (userPhoneContacts == null || page == null || (userPhoneContacts.getUserId() == null && StringUtils.isBlank(userPhoneContacts.getPhone()) && StringUtils.isBlank(userPhoneContacts.getImei()))) { logger.error("【HBase】(core0)入参userPhoneContacts或page为NULL或一个参数都没传"); return null; } StringBuffer sb = new StringBuffer( " select id,userId,imei,phone,name,callTimes,lastCall as lastCallTwo from phoneContacts where 1=1 "); String pageSql = "select count(1) from phoneContacts where 1=1 "; if (userPhoneContacts.getUserId() != null) { sb.append(" and userId = ").append(userPhoneContacts.getUserId()); pageSql += " and userId = " + userPhoneContacts.getUserId(); } if (StringUtils.isNotBlank(userPhoneContacts.getPhone())) { String phone = "'" + userPhoneContacts.getPhone() + "'"; sb.append(" and phone = ").append(phone); pageSql += " and phone = " + page; } if (StringUtils.isNotBlank(userPhoneContacts.getImei())) { String imei = "'" + userPhoneContacts.getImei() + "'"; sb.append(" and imei = ").append(imei); pageSql += " and imei = " + imei; } sb.append(" order by callTimes desc "); String sql = sb.toString(); sql = sql + " limit " + page.getPageSize(); logger.warn("【HBase】进入分页查询(core0)方法,sql={}", sql); List
      
        userPhoneContactsList = null; try { long startTime = System.currentTimeMillis(); logger.info("【HBase】执行分页查询Result(core0)成功,sql:" + sql); userPhoneContactsList = jdbcTemplate.query(sql, new BeanPropertyRowMapper
       
        (UserPhoneContacts.class)); logger.info("【HBase】执行分页查询PageSize(core0)成功,sql:" + pageSql); long totalCount = jdbcTemplate.queryForObject(pageSql, Long.class); long endTime = System.currentTimeMillis(); float excTime = (float) (endTime - startTime) / 1000; page.setTotalCount(totalCount); page.setResult(userPhoneContactsList); logger.info("【HBase】执行分页查询(core0)成功,sql:" + sql + ",耗时:" + excTime); } catch (Exception e) { logger.error("【HBase】执行分页查询PageSize(core0)失败,sql:" + pageSql, e); e.printStackTrace(); } return page; } @Override public List
        
          query(String whereSql) { logger.warn("【HBase】进入自带条件查询(core0)方法"); if (StringUtils.isBlank(whereSql)) { logger.error("【HBase】(core0)入参whereSql为NULL"); return null; } StringBuffer sb = new StringBuffer( " select id,userId,imei,phone,name,callTimes,lastCall as lastCallTwo from phoneContacts "); String sql = sb.append(whereSql).toString(); logger.warn("【HBase】进入自带条件查询(core0)方法,sql={}", sql); List
         
           userPhoneContactsList = null; try { long startTime = System.currentTimeMillis(); userPhoneContactsList = jdbcTemplate.query(sql, new BeanPropertyRowMapper
          
           (UserPhoneContacts.class)); long endTime = System.currentTimeMillis(); float excTime = (float) (endTime - startTime) / 1000; logger.warn("【HBase】执行自带条件查询(core0)成功,sql:" + sql + ",耗时:" + excTime); return userPhoneContactsList; } catch (Exception e) { logger.error("【HBase】执行自带条件查询(core0)失败,sql:" + sql + ",失败原因:" + e.toString(), e); e.printStackTrace(); } return null; } @Override public void deleteById(UserPhoneContacts userPhoneContacts) { logger.warn("【HBase】进入删除(core0)方法"); if (userPhoneContacts == null || userPhoneContacts.getId() == null) { logger.error("【HBase】(core0)入参userPhoneContacts为NULL"); return; } String sql = "delete from phoneContacts where Id = '" + userPhoneContacts.getId() +"'"; logger.warn("【HBase】进入删除(core0)方法,sql={}", sql); try { long startTime = System.currentTimeMillis(); jdbcTemplate.update(sql); long endTime = System.currentTimeMillis(); float excTime = (float) (endTime - startTime) / 1000; logger.info("【HBase】执行删除(core0)成功,sql:" + sql + ",耗时:" + excTime); } catch (Exception e) { logger.error("【HBase】执行删除(core0)失败,sql:" + sql + ",失败原因:" + e.toString(), e); e.printStackTrace(); } } @Override public void add(List
           
             userPhoneContactsList) { logger.warn("【HBase】进入添加(core0)方法"); if (CollectionUtils.isEmpty(userPhoneContactsList)) { logger.error("【HBase】(core0)入参userPhoneContactsList为NULL"); return; } String sql = "upsert into phoneContacts values (?,?,?,?,?,?,?,?)"; logger.warn("【HBase】进入添加(core0)方法,userId={},sql={}", userPhoneContactsList.get(0).getUserId(), sql); Object args[] = new Object[8]; long startTime = System.currentTimeMillis(); int successNum = 0; int filterNum = 0; try{ for (int i = 0; i < userPhoneContactsList.size(); i++) { //过滤空数据 try { if (StringUtils.isBlank(StringFilter(userPhoneContactsList.get(i).getImei())) || StringUtils.isBlank(StringFilter(userPhoneContactsList.get(i).getName())) || StringUtils.isBlank(StringFilter(userPhoneContactsList.get(i).getPhone()))) { filterNum ++; continue; } String uuid = UUID.randomUUID().toString().replaceAll("-", "").substring(0, 16); args[0] = uuid; args[1] = userPhoneContactsList.get(i).getUserId(); args[2] = StringFilter(userPhoneContactsList.get(i).getImei()); args[3] = StringFilter(userPhoneContactsList.get(i).getImsi()); args[4] = StringFilter(userPhoneContactsList.get(i).getPhone()); args[5] = StringFilter(userPhoneContactsList.get(i).getName()); args[6] = userPhoneContactsList.get(i).getCallTimes()==null?0L:userPhoneContactsList.get(i).getCallTimes(); // 由于最初用date.toLocalString(),保存入库的格式为Jun 3, 2017 3:05:54 PM, // 为了保持该表数据的一致性,并且date.toLocalString()由于环境不同,会有不同的格式,故用下列方式处理 args[7] = df.format(userPhoneContactsList.get(i).getLastCall()); jdbcTemplate.update(sql, args); successNum++; } catch (Exception e) { logger.error("【HBase】执行添加(core0)失败,sql:" + sql + ",失败原因:" + e.toString(), e); } } long endTime = System.currentTimeMillis(); float excTime = (float) (endTime - startTime) / 1000; try{ logger.warn("【HBase】进入批量插入(core0)方法,user_id={},成功={},过滤={},耗时={}", userPhoneContactsList.get(0).getUserId(), successNum, filterNum, excTime); }catch(Exception e){ logger.error("【HBase】添加日志错误",e); } }catch (Exception e) { logger.error("【HBase】core5 add for循环异常:", e); } } @Override public Long count(UserPhoneContacts userPhoneContacts) { logger.warn("【HBase】进入计数(core0)方法"); if (userPhoneContacts == null || userPhoneContacts.getUserId() == null) { return 0L; } String sql = "select count(1) from phoneContacts where userId = " + userPhoneContacts.getUserId(); logger.warn("【HBase】进入计数(core0)方法,sql={}", sql); if (StringUtils.isNotBlank(userPhoneContacts.getImei())) { sql = sql + " and imei = '" + userPhoneContacts.getImei() + "'"; } try { long startTime = System.currentTimeMillis(); Long count = jdbcTemplate.queryForObject(sql, Long.class); long endTime = System.currentTimeMillis(); float excTime = (float) (endTime - startTime) / 1000; logger.info("执行sql:" + sql + "耗时:" + excTime); return count; } catch (Exception e) { logger.error("执行失败sql:" + sql + "执行失败原因:" + e.toString(), e); e.printStackTrace(); } return 0L; } @Override public void updateById(List
            
              userPhoneContactsList) { logger.warn("【HBase】进入修改(core0)方法"); if (CollectionUtils.isEmpty(userPhoneContactsList)) { logger.error("【HBase】(core0)修改入参userPhoneContactsList为NULL"); return; } String sql = ""; logger.warn("【HBase】进入修改(core0)方法,userId={},sql={}", userPhoneContactsList.get(0).getUserId(), sql); long startTime = System.currentTimeMillis(); UserPhoneContacts upc = null; String lastCall = null; for (int i = 0; i < userPhoneContactsList.size(); i++) { upc = userPhoneContactsList.get(i); if(upc.getLastCall() != null){ lastCall = df.format(upc.getLastCall()); }else{ lastCall = df.format(new Date(0)); } String name = StringFilter(upc.getName().trim()); if(name==""){ continue; }else{ sql = "upsert into phoneContacts(id,userid,name,callTimes,lastCall) values('" + upc.getId() + "'," + upc.getUserId() + ",'" + StringFilter(upc.getName()) + "'," + upc.getCallTimes() + ",'" + lastCall + "')"; try { jdbcTemplate.update(sql); } catch (Exception e) { logger.error("【HBase】执行修改(core0)失败,sql:" + sql + ",失败原因:" + e.toString(), e); e.printStackTrace(); } } } long endTime = System.currentTimeMillis(); float excTime = (float) (endTime - startTime) / 1000; try{ logger.warn("【HBase】进入批量修改(core0)方法,user_id={},size={},耗时={}", userPhoneContactsList.get(0).getUserId(), userPhoneContactsList.size(), excTime); }catch(Exception e){ logger.error("【HBase】添加日志错误",e); } } // 正则表达式 新增时 去除特殊字符串 public String StringFilter(String str) throws PatternSyntaxException { if (str == null) { str = ""; } String regEx = "[`~!@#$%^&*()+=|{}':;',//[//].<>/?~!@#¥%……&*()——+|{}【】‘;:”“’。,、?\\\\]"; Pattern p = Pattern.compile(regEx); Matcher m = p.matcher(str); return m.replaceAll("").trim(); } }
            
           
          
         
        
       
      
     
    
   
  
 
创建 phoenix连接
package com.mimidai.common.utils.hbase;

import com.mimidai.common.utils.PropertiesUtils;

import org.apache.commons.dbcp.BasicDataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;


@Configuration
public class PhoenixConfig {

    public BasicDataSource phoenixDataSource() {
        BasicDataSource dataSource = new BasicDataSource();
        dataSource.setDriverClassName("org.apache.phoenix.jdbc.PhoenixDriver");
        //使用这种方式来获取 配置文件中的url
        String url = PropertiesUtils.getProperties().getProperty("phoenix_url");

        // 正式环境 ip
        // dataSource.setUrl(
        //         "jdbc:phoenix:XXXXXXXXXXXXXXXXXXX");
        // 测试环境 ip
        //  dataSource.setUrl("jdbc:phoenix:XXXXXXXXXXXX");
        dataSource.setUrl(url);
        return dataSource;
    }

    @Bean
    public JdbcTemplate phoenixJdbcTemplate() {
        JdbcTemplate jdbcTemplate = new JdbcTemplate();
        jdbcTemplate.setDataSource(phoenixDataSource());
        return jdbcTemplate;
    }

}
实体类(@field为solr注解 @Expose Gson注解)
package com.mimidai.common.entity.user;

import com.google.gson.annotations.Expose;
import java.util.Date;

import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.beans.Field;

import com.mimidai.common.entity.sys.ExcludePhone;

/**
* @ClassName: UserPhoneContacts
* @Description: 用户通讯录表
* @author 李兴达
* @date 2015年12月25日 上午10:14:01
*
*/ 
public class UserPhoneContacts {
	@Field
  @Expose(serialize = false)
	private String id;
	/**
	* @Fields userId :用户id 
	*/ 
	@Field
  @Expose
  private Long userId;
	/**
	* @Fields imei : 手机imei号
	*/ 
	@Field
  @Expose
	private String imei;
	/**
	* @Fields imsi : 手机imsi号
	*/ 
	@Field
  @Expose
	private String imsi;
	/**
	* @Fields phone : 联系人电话
	*/ 
	@Field
  @Expose
	private String phone;
	/**
	* @Fields name : 名称
	*/ 
	@Field
  @Expose
	private String name;
	/**
	* @Fields callTimes :通话次数 
	*/ 
	@Field
  @Expose
	private Long callTimes;
	/**
	* @Fields lastCall : 最后通话时间
	*/ 
	@Field
  @Expose
	private Date lastCall;
	/**
	* @Fields updateTime : 更新时间
	*/
	@Expose
	private Date updateTime;
	/**
	* @Fields state : 状态
	*/
	@Expose
	private String state;
	/**
	 *  Fields state : 敏感
	 */
	@Field
  @Expose
	private String sensitive;
	/**
	 *
	 */
	@Field
  @Expose(serialize = false)
	private String version;

  @Expose
	private String lastCallTwo;


	/** 
	* @Fields repetition : 是否和通信表电话重复
	*/
	@Expose
	private boolean repetition;
	/**
	 * 借款申请表id
	 */
  @Field
  @Expose
	private  String loanApplyId;

	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public Long getUserId() {
		return userId;
	}
	public void setUserId(Long userId) {
		this.userId = userId;
	}
	public String getImei() {
		return imei;
	}
	public void setImei(String imei) {
		this.imei = imei;
	}
	public String getImsi() {
		return imsi;
	}
	public void setImsi(String imsi) {
		this.imsi = imsi;
	}
	public String getPhone() {
		return phone;
	}
	public void setPhone(String phone) {
		this.phone = phone;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public Long getCallTimes() {
		return callTimes;
	}
	public void setCallTimes(Long callTimes) {
		this.callTimes = callTimes;
	}
	public Date getLastCall() {
		if(this.lastCall == null){
			Date date = new Date(0);
			try {
				if (StringUtils.isNotBlank(lastCallTwo)) {
					date = new Date(lastCallTwo);
				}
			} catch (Exception e) {
				date = new Date(0);
			}
			return date;
		}else{
			return this.lastCall;
		}
		
	}

	public void setLastCall(Date lastCall) {
		this.lastCall = lastCall;
	}

	public String getLastCallTwo() {
		return lastCallTwo;
	}

	public void setLastCallTwo(String lastCallTwo) {
		this.lastCallTwo = lastCallTwo;
	}

	public Date getUpdateTime() {
		return updateTime;
	}
	public void setUpdateTime(Date updateTime) {
		this.updateTime = updateTime;
	}
	public String getState() {
		return state;
	}
	public void setState(String state) {
		this.state = state;
	}
	public String getSensitive() {
		return sensitive;
	}
	public void setSensitive(String sensitive) {
		this.sensitive = sensitive;
	}
	@Override
	public boolean equals(Object obj) {
		 if (obj instanceof ExcludePhone) {   
			 ExcludePhone ep = (ExcludePhone) obj;   
			 return this.phone.equals(ep.getPhone());   
		 }   
		return super.equals(obj);
	}
	public boolean isRepetition() {
		return repetition;
	}
	public void setRepetition(boolean repetition) {
		this.repetition = repetition;
	}

	public String getLoanApplyId() {
		return loanApplyId;
	}

	public void setLoanApplyId(String loanApplyId) {
		this.loanApplyId = loanApplyId;
	}

	public String getVersion() {
		return version;
	}

	public void setVersion(String version) {
		this.version = version;
	}
}
网友评论