接口 package com.mimidai.common.dao.hbase;import java.util.List;import com.mimidai.common.entity.user.UserPhoneContacts;import com.mimidai.common.utils.persistence.Page;/** * 用户通讯录接口 core0 * cms query count delete * * @autho
package com.mimidai.common.dao.hbase;
import java.util.List;
import com.mimidai.common.entity.user.UserPhoneContacts;
import com.mimidai.common.utils.persistence.Page;
/**
* 用户通讯录接口 core0
* cms query count delete
*
* @author EVE
*
*/
public interface HBaseUserPhoneContactsDao {
/**
*
* @param userPhoneContacts
* 用户通讯录实例
* @param disableUserId
* 是否排除这个userId的
* @param sort
* 是否需要倒序排列
* @return
*/
List
query(UserPhoneContacts userPhoneContacts, boolean disableUserId,
boolean sort);
Page
query(UserPhoneContacts userPhoneContacts, Page
page); /** * * @param userPhoneContacts * 用户通讯记录表 * @param whereSql * 拼接的where 条件之后的语句 * @return */ List
query(String whereSql); void deleteById(UserPhoneContacts userPhoneContacts); void add(List
userPhoneContactsList); Long count(UserPhoneContacts userPhoneContacts); /** * 根据ID修改 * @param userPhoneContactsList */ void updateById(List
userPhoneContactsList); }
接口实现类
package com.mimidai.common.utils.hbase;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import com.mimidai.common.dao.hbase.HBaseUserPhoneContactsDao;
import com.mimidai.common.entity.user.UserPhoneContacts;
import com.mimidai.common.utils.persistence.Page;
@Repository
public class HBaseUserPhoneContactsDaoImpl implements HBaseUserPhoneContactsDao {
private static Logger logger = LoggerFactory
.getLogger("log.hbase.HBaseUserPhoneContactsDaoImpl");
@Autowired
@Qualifier("phoenixJdbcTemplate")
private JdbcTemplate jdbcTemplate;
private SimpleDateFormat df = new SimpleDateFormat("MMM d, yyyy h:mm:ss aaa",Locale.ENGLISH);
@Override
public List
query(UserPhoneContacts userPhoneContacts, boolean disableUserId,
boolean sort) {
logger.warn("【HBase】进入查询(core0)方法");
if (userPhoneContacts == null) {
logger.error("【HBase】(core0)入参userPhoneContacts为NULL");
return null;
}
StringBuffer sb = new StringBuffer(
" select id,userId,imei,phone,name,callTimes,lastCall as lastCallTwo from phoneContacts where 1=1 ");
if (userPhoneContacts.getUserId() != null) {
if (!disableUserId) {
sb.append(" and userId = ").append(userPhoneContacts.getUserId());
if (StringUtils.isNotBlank(userPhoneContacts.getPhone())) {
String phone = "'" + userPhoneContacts.getPhone() + "'";
sb.append(" and phone = ").append(phone);
}
if (StringUtils.isNotBlank(userPhoneContacts.getImei())) {
String imei = "'" + userPhoneContacts.getImei() + "'";
sb.append(" and imei = ").append(imei);
}
} else {
//为了匹配索引 pu
if (StringUtils.isNotBlank(userPhoneContacts.getPhone())) {
String phone = "'" + userPhoneContacts.getPhone() + "'";
sb.append(" and phone = ").append(phone);
}
sb.append(" and userId != ").append(userPhoneContacts.getUserId());
if (StringUtils.isNotBlank(userPhoneContacts.getImei())) {
String imei = "'" + userPhoneContacts.getImei() + "'";
sb.append(" and imei = ").append(imei);
}
}
}else{
return null;
}
if (sort)
sb.append(" order by callTimes desc");
sb.append(" limit 80000 ");
String sql = sb.toString();
logger.warn("【HBase】进入查询(core0)方法,sql={}", sql);
List
userPhoneContactsList = null; try { long startTime = System.currentTimeMillis(); userPhoneContactsList = jdbcTemplate.query(sql, new BeanPropertyRowMapper
(UserPhoneContacts.class)); long endTime = System.currentTimeMillis(); float excTime = (float) (endTime - startTime) / 1000; logger.info("【HBase】执行查询(core0)成功,sql:" + sql + ",耗时:" + excTime); return userPhoneContactsList; }catch (Exception e) { logger.error("【HBase】执行自带条件查询(core0)失败,sql:" + sql + ",失败原因:" + e.toString(), e); e.printStackTrace(); } return null; } @Override /** * ScheduledTasks->newYearOverdue30to60Message 定时任务 注释掉 暂不实现 */ public Page
query(UserPhoneContacts userPhoneContacts, Page
page) { logger.warn("【HBase】进入分页查询(core0)方法"); if (userPhoneContacts == null || page == null || (userPhoneContacts.getUserId() == null && StringUtils.isBlank(userPhoneContacts.getPhone()) && StringUtils.isBlank(userPhoneContacts.getImei()))) { logger.error("【HBase】(core0)入参userPhoneContacts或page为NULL或一个参数都没传"); return null; } StringBuffer sb = new StringBuffer( " select id,userId,imei,phone,name,callTimes,lastCall as lastCallTwo from phoneContacts where 1=1 "); String pageSql = "select count(1) from phoneContacts where 1=1 "; if (userPhoneContacts.getUserId() != null) { sb.append(" and userId = ").append(userPhoneContacts.getUserId()); pageSql += " and userId = " + userPhoneContacts.getUserId(); } if (StringUtils.isNotBlank(userPhoneContacts.getPhone())) { String phone = "'" + userPhoneContacts.getPhone() + "'"; sb.append(" and phone = ").append(phone); pageSql += " and phone = " + page; } if (StringUtils.isNotBlank(userPhoneContacts.getImei())) { String imei = "'" + userPhoneContacts.getImei() + "'"; sb.append(" and imei = ").append(imei); pageSql += " and imei = " + imei; } sb.append(" order by callTimes desc "); String sql = sb.toString(); sql = sql + " limit " + page.getPageSize(); logger.warn("【HBase】进入分页查询(core0)方法,sql={}", sql); List
userPhoneContactsList = null; try { long startTime = System.currentTimeMillis(); logger.info("【HBase】执行分页查询Result(core0)成功,sql:" + sql); userPhoneContactsList = jdbcTemplate.query(sql, new BeanPropertyRowMapper
(UserPhoneContacts.class)); logger.info("【HBase】执行分页查询PageSize(core0)成功,sql:" + pageSql); long totalCount = jdbcTemplate.queryForObject(pageSql, Long.class); long endTime = System.currentTimeMillis(); float excTime = (float) (endTime - startTime) / 1000; page.setTotalCount(totalCount); page.setResult(userPhoneContactsList); logger.info("【HBase】执行分页查询(core0)成功,sql:" + sql + ",耗时:" + excTime); } catch (Exception e) { logger.error("【HBase】执行分页查询PageSize(core0)失败,sql:" + pageSql, e); e.printStackTrace(); } return page; } @Override public List
query(String whereSql) { logger.warn("【HBase】进入自带条件查询(core0)方法"); if (StringUtils.isBlank(whereSql)) { logger.error("【HBase】(core0)入参whereSql为NULL"); return null; } StringBuffer sb = new StringBuffer( " select id,userId,imei,phone,name,callTimes,lastCall as lastCallTwo from phoneContacts "); String sql = sb.append(whereSql).toString(); logger.warn("【HBase】进入自带条件查询(core0)方法,sql={}", sql); List
userPhoneContactsList = null; try { long startTime = System.currentTimeMillis(); userPhoneContactsList = jdbcTemplate.query(sql, new BeanPropertyRowMapper
(UserPhoneContacts.class)); long endTime = System.currentTimeMillis(); float excTime = (float) (endTime - startTime) / 1000; logger.warn("【HBase】执行自带条件查询(core0)成功,sql:" + sql + ",耗时:" + excTime); return userPhoneContactsList; } catch (Exception e) { logger.error("【HBase】执行自带条件查询(core0)失败,sql:" + sql + ",失败原因:" + e.toString(), e); e.printStackTrace(); } return null; } @Override public void deleteById(UserPhoneContacts userPhoneContacts) { logger.warn("【HBase】进入删除(core0)方法"); if (userPhoneContacts == null || userPhoneContacts.getId() == null) { logger.error("【HBase】(core0)入参userPhoneContacts为NULL"); return; } String sql = "delete from phoneContacts where Id = '" + userPhoneContacts.getId() +"'"; logger.warn("【HBase】进入删除(core0)方法,sql={}", sql); try { long startTime = System.currentTimeMillis(); jdbcTemplate.update(sql); long endTime = System.currentTimeMillis(); float excTime = (float) (endTime - startTime) / 1000; logger.info("【HBase】执行删除(core0)成功,sql:" + sql + ",耗时:" + excTime); } catch (Exception e) { logger.error("【HBase】执行删除(core0)失败,sql:" + sql + ",失败原因:" + e.toString(), e); e.printStackTrace(); } } @Override public void add(List
userPhoneContactsList) { logger.warn("【HBase】进入添加(core0)方法"); if (CollectionUtils.isEmpty(userPhoneContactsList)) { logger.error("【HBase】(core0)入参userPhoneContactsList为NULL"); return; } String sql = "upsert into phoneContacts values (?,?,?,?,?,?,?,?)"; logger.warn("【HBase】进入添加(core0)方法,userId={},sql={}", userPhoneContactsList.get(0).getUserId(), sql); Object args[] = new Object[8]; long startTime = System.currentTimeMillis(); int successNum = 0; int filterNum = 0; try{ for (int i = 0; i < userPhoneContactsList.size(); i++) { //过滤空数据 try { if (StringUtils.isBlank(StringFilter(userPhoneContactsList.get(i).getImei())) || StringUtils.isBlank(StringFilter(userPhoneContactsList.get(i).getName())) || StringUtils.isBlank(StringFilter(userPhoneContactsList.get(i).getPhone()))) { filterNum ++; continue; } String uuid = UUID.randomUUID().toString().replaceAll("-", "").substring(0, 16); args[0] = uuid; args[1] = userPhoneContactsList.get(i).getUserId(); args[2] = StringFilter(userPhoneContactsList.get(i).getImei()); args[3] = StringFilter(userPhoneContactsList.get(i).getImsi()); args[4] = StringFilter(userPhoneContactsList.get(i).getPhone()); args[5] = StringFilter(userPhoneContactsList.get(i).getName()); args[6] = userPhoneContactsList.get(i).getCallTimes()==null?0L:userPhoneContactsList.get(i).getCallTimes(); // 由于最初用date.toLocalString(),保存入库的格式为Jun 3, 2017 3:05:54 PM, // 为了保持该表数据的一致性,并且date.toLocalString()由于环境不同,会有不同的格式,故用下列方式处理 args[7] = df.format(userPhoneContactsList.get(i).getLastCall()); jdbcTemplate.update(sql, args); successNum++; } catch (Exception e) { logger.error("【HBase】执行添加(core0)失败,sql:" + sql + ",失败原因:" + e.toString(), e); } } long endTime = System.currentTimeMillis(); float excTime = (float) (endTime - startTime) / 1000; try{ logger.warn("【HBase】进入批量插入(core0)方法,user_id={},成功={},过滤={},耗时={}", userPhoneContactsList.get(0).getUserId(), successNum, filterNum, excTime); }catch(Exception e){ logger.error("【HBase】添加日志错误",e); } }catch (Exception e) { logger.error("【HBase】core5 add for循环异常:", e); } } @Override public Long count(UserPhoneContacts userPhoneContacts) { logger.warn("【HBase】进入计数(core0)方法"); if (userPhoneContacts == null || userPhoneContacts.getUserId() == null) { return 0L; } String sql = "select count(1) from phoneContacts where userId = " + userPhoneContacts.getUserId(); logger.warn("【HBase】进入计数(core0)方法,sql={}", sql); if (StringUtils.isNotBlank(userPhoneContacts.getImei())) { sql = sql + " and imei = '" + userPhoneContacts.getImei() + "'"; } try { long startTime = System.currentTimeMillis(); Long count = jdbcTemplate.queryForObject(sql, Long.class); long endTime = System.currentTimeMillis(); float excTime = (float) (endTime - startTime) / 1000; logger.info("执行sql:" + sql + "耗时:" + excTime); return count; } catch (Exception e) { logger.error("执行失败sql:" + sql + "执行失败原因:" + e.toString(), e); e.printStackTrace(); } return 0L; } @Override public void updateById(List
userPhoneContactsList) { logger.warn("【HBase】进入修改(core0)方法"); if (CollectionUtils.isEmpty(userPhoneContactsList)) { logger.error("【HBase】(core0)修改入参userPhoneContactsList为NULL"); return; } String sql = ""; logger.warn("【HBase】进入修改(core0)方法,userId={},sql={}", userPhoneContactsList.get(0).getUserId(), sql); long startTime = System.currentTimeMillis(); UserPhoneContacts upc = null; String lastCall = null; for (int i = 0; i < userPhoneContactsList.size(); i++) { upc = userPhoneContactsList.get(i); if(upc.getLastCall() != null){ lastCall = df.format(upc.getLastCall()); }else{ lastCall = df.format(new Date(0)); } String name = StringFilter(upc.getName().trim()); if(name==""){ continue; }else{ sql = "upsert into phoneContacts(id,userid,name,callTimes,lastCall) values('" + upc.getId() + "'," + upc.getUserId() + ",'" + StringFilter(upc.getName()) + "'," + upc.getCallTimes() + ",'" + lastCall + "')"; try { jdbcTemplate.update(sql); } catch (Exception e) { logger.error("【HBase】执行修改(core0)失败,sql:" + sql + ",失败原因:" + e.toString(), e); e.printStackTrace(); } } } long endTime = System.currentTimeMillis(); float excTime = (float) (endTime - startTime) / 1000; try{ logger.warn("【HBase】进入批量修改(core0)方法,user_id={},size={},耗时={}", userPhoneContactsList.get(0).getUserId(), userPhoneContactsList.size(), excTime); }catch(Exception e){ logger.error("【HBase】添加日志错误",e); } } // 正则表达式 新增时 去除特殊字符串 public String StringFilter(String str) throws PatternSyntaxException { if (str == null) { str = ""; } String regEx = "[`~!@#$%^&*()+=|{}':;',//[//].<>/?~!@#¥%……&*()——+|{}【】‘;:”“’。,、?\\\\]"; Pattern p = Pattern.compile(regEx); Matcher m = p.matcher(str); return m.replaceAll("").trim(); } }
创建 phoenix连接
package com.mimidai.common.utils.hbase;
import com.mimidai.common.utils.PropertiesUtils;
import org.apache.commons.dbcp.BasicDataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
@Configuration
public class PhoenixConfig {
public BasicDataSource phoenixDataSource() {
BasicDataSource dataSource = new BasicDataSource();
dataSource.setDriverClassName("org.apache.phoenix.jdbc.PhoenixDriver");
//使用这种方式来获取 配置文件中的url
String url = PropertiesUtils.getProperties().getProperty("phoenix_url");
// 正式环境 ip
// dataSource.setUrl(
// "jdbc:phoenix:XXXXXXXXXXXXXXXXXXX");
// 测试环境 ip
// dataSource.setUrl("jdbc:phoenix:XXXXXXXXXXXX");
dataSource.setUrl(url);
return dataSource;
}
@Bean
public JdbcTemplate phoenixJdbcTemplate() {
JdbcTemplate jdbcTemplate = new JdbcTemplate();
jdbcTemplate.setDataSource(phoenixDataSource());
return jdbcTemplate;
}
}
实体类(@field为solr注解 @Expose Gson注解)
package com.mimidai.common.entity.user;
import com.google.gson.annotations.Expose;
import java.util.Date;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.beans.Field;
import com.mimidai.common.entity.sys.ExcludePhone;
/**
* @ClassName: UserPhoneContacts
* @Description: 用户通讯录表
* @author 李兴达
* @date 2015年12月25日 上午10:14:01
*
*/
public class UserPhoneContacts {
@Field
@Expose(serialize = false)
private String id;
/**
* @Fields userId :用户id
*/
@Field
@Expose
private Long userId;
/**
* @Fields imei : 手机imei号
*/
@Field
@Expose
private String imei;
/**
* @Fields imsi : 手机imsi号
*/
@Field
@Expose
private String imsi;
/**
* @Fields phone : 联系人电话
*/
@Field
@Expose
private String phone;
/**
* @Fields name : 名称
*/
@Field
@Expose
private String name;
/**
* @Fields callTimes :通话次数
*/
@Field
@Expose
private Long callTimes;
/**
* @Fields lastCall : 最后通话时间
*/
@Field
@Expose
private Date lastCall;
/**
* @Fields updateTime : 更新时间
*/
@Expose
private Date updateTime;
/**
* @Fields state : 状态
*/
@Expose
private String state;
/**
* Fields state : 敏感
*/
@Field
@Expose
private String sensitive;
/**
*
*/
@Field
@Expose(serialize = false)
private String version;
@Expose
private String lastCallTwo;
/**
* @Fields repetition : 是否和通信表电话重复
*/
@Expose
private boolean repetition;
/**
* 借款申请表id
*/
@Field
@Expose
private String loanApplyId;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getImei() {
return imei;
}
public void setImei(String imei) {
this.imei = imei;
}
public String getImsi() {
return imsi;
}
public void setImsi(String imsi) {
this.imsi = imsi;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Long getCallTimes() {
return callTimes;
}
public void setCallTimes(Long callTimes) {
this.callTimes = callTimes;
}
public Date getLastCall() {
if(this.lastCall == null){
Date date = new Date(0);
try {
if (StringUtils.isNotBlank(lastCallTwo)) {
date = new Date(lastCallTwo);
}
} catch (Exception e) {
date = new Date(0);
}
return date;
}else{
return this.lastCall;
}
}
public void setLastCall(Date lastCall) {
this.lastCall = lastCall;
}
public String getLastCallTwo() {
return lastCallTwo;
}
public void setLastCallTwo(String lastCallTwo) {
this.lastCallTwo = lastCallTwo;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
public String getSensitive() {
return sensitive;
}
public void setSensitive(String sensitive) {
this.sensitive = sensitive;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof ExcludePhone) {
ExcludePhone ep = (ExcludePhone) obj;
return this.phone.equals(ep.getPhone());
}
return super.equals(obj);
}
public boolean isRepetition() {
return repetition;
}
public void setRepetition(boolean repetition) {
this.repetition = repetition;
}
public String getLoanApplyId() {
return loanApplyId;
}
public void setLoanApplyId(String loanApplyId) {
this.loanApplyId = loanApplyId;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
}
