构建一个大数据量的搜索引擎,数据很重要,数据来源在哪里呢?一方面可以从站内结构化数据库导入,如MySQL,Oracle等数据库,构建一个站内搜索引擎,提高查询速度.另一方面构建一个分布式爬
构建一个大数据量的搜索引擎,数据很重要,数据来源在哪里呢?一方面可以从站内结构化数据库导入,如MySQL,Oracle等数据库,构建一个站内搜索引擎,提高查询速度.另一方面构建一个分布式爬虫,每天定时抓取数据,不断地添加到索引库.典型地如百度,谷歌等全文检索引擎.
我们现在要做的就是第二种东西.说难不难,show That
1.定义一个实体,与索引库的type数据字段名一致.
package com.ytdx.entity; import java.io.Serializable; /** * ES 索引对应实体 * @author lhy * */ public class Blob implements Serializable { private Integer id; //文章id private String title; //文章标题 private String describe; //描述 private String url; //文章路径 private String ImageUrl; //图片路径 private String postInfo; //发布信息 public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public String getDescribe() { return describe; } public void setDescribe(String describe) { this.describe = describe; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getImageUrl() { return ImageUrl; } public void setImageUrl(String imageUrl) { ImageUrl = imageUrl; } public String getPostInfo() { return postInfo; } public void setPostInfo(String postInfo) { this.postInfo = postInfo; } @Override public String toString() { return "Blob [id=" + id + ", title=" + title + ", describe=" + describe + ", url=" + url + ", ImageUrl=" + ImageUrl + ", postInfo=" + postInfo + "]"; } }
2.ES服务操作的工具类
package com.ytdx.util; import java.net.InetAddress; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; /** * ES服务器操作工具类 * @author lhy * @date 2018.04.20 */ public class EsUtil { private Client client; private static String name = "127.0.0.1"; private static Integer port = 9300; // private static ObjectMapper objectMapper = new ObjectMapper(); static Settings set = Settings.builder().put("cluster.name", "elasticsearch").build(); //.put("client.transport.sniff",false). /** * 本地客户端连接ES服务器 * @return */ public static Client EsConnect(){ TransportClient client = null; try { client = new PreBuiltTransportClient(set) .addTransportAddress(new TransportAddress(InetAddress.getByName(name), port)); System.out.println("ES服务器连接成功!"); return client; } catch (Exception e) { // TODO Auto-generated catch block System.out.println("ES服务器连接失败!"); e.printStackTrace(); } return client; } /** * 关闭ES连接的客户端 * @param client */ public static void closeClient(Client client){ if(client != null){ client.close(); System.out.println("Client已关闭!"); } } }
3.索引库的批量操作
package com.ytdx.es; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.xcontent.XContentFactory; import com.ytdx.entity.Blob; public class BulkIndex { private Client client; private String index = "home"; private String type = "blob"; public BulkIndex(Client client) { super(); this.client = client; } //批量添加索引 public void BulkAddBlob(Blob blob) throws Exception { BulkRequestBuilder builder = client.prepareBulk(); builder.add( client.prepareIndex(index, type) .setSource(XContentFactory.jsonBuilder().startObject().field("title", blob.getTitle()) .field("describe", blob.getDescribe()).field("url", blob.getUrl()) .field("ImageUrl", blob.getImageUrl()).field("postInfo", blob.getPostInfo()).endObject())); BulkResponse response = builder.execute().actionGet(); // for (BulkItemResponse item : response.getItems()) { // System.out.println("你的批量操作Response信息为: " + item.getResponse()); // } // 4.错误信息日志读取 if (response.hasFailures()) { // 可在这里对于失败请求进行处理 for (BulkItemResponse item : response.getItems()) { if (item.isFailed()) { System.out.println("失败信息:--------" + item.getFailureMessage()); } } } } //批量删除索引库 public void BulkDelIndex() throws Exception { DeleteIndexResponse Response = client.admin().indices().prepareDelete(index) .execute().actionGet(); System.out.println("索引删除成功!"); } //批量创建结构化索引库 public void BulkAdd() throws Exception { BulkRequestBuilder builder = client.prepareBulk(); builder.add(client.prepareIndex(index, type) .setSource(XContentFactory.jsonBuilder().startObject().startObject("properties") .startObject("id").field("type", "integer").field("store", "yes").endObject() .startObject("title").field("type", "text").field("store", "yes").endObject() .startObject("describe").field("type", "text").field("store", "yes").endObject() .startObject("url").field("type", "text").field("store", "yes").endObject() .startObject("ImageUrl").field("type", "text").field("store", "yes").endObject() .startObject("postInfo").field("type", "text").field("store", "yes").endObject() .endObject().endObject())); BulkResponse response = builder.execute().actionGet(); System.out.println("索引库创建成功!"); } }4.分布式爬虫定时抓取数据,构建大数据量的索引库
package com.ytdx.task; import org.elasticsearch.client.Client; import org.jsoup.Jsoup; import org.jsoup.nodes.Document; import org.jsoup.nodes.Element; import org.jsoup.select.Elements; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import com.ytdx.entity.Blob; import com.ytdx.es.BulkIndex; import com.ytdx.util.EsUtil; @Component("FetchDataTask") public class FetchData { @Scheduled(cron = "0 30 12 ? * *") // 每天上午12:30抓取一次数据 public void TimeTask() throws Exception { Client client = EsUtil.EsConnect(); BulkIndex bulk = new BulkIndex(client); bulk.BulkDelIndex(); //先删除之前的索引库 bulk.BulkAdd(); //结构化重建索引库 String URL = "https://www.cnblogs.com/"; Fetch(client,URL); // 先抓取首页数据 for(int l=2;l<=200; l++){ //抓取从第2页到200页的数据 String url = "https://www.cnblogs.com/#p"; url += l; Fetch(client,url); } System.out.println("系统抓取博客数据成功!"); } public void Fetch(Client client,String URL) throws Exception { // Client client = EsUtil.EsConnect(); BulkIndex bulk = new BulkIndex(client); Document doc = Jsoup.connect(URL).get(); Element e = doc.getElementById("post_list"); Elements es = e.children(); for (int i = 0; i < es.size(); i++) { Element nodes = es.get(i); Element item = nodes.getElementsByClass("post_item_body").first(); String title = item.getElementsByTag("h3").get(0).getElementsByTag("a").text(); String describe = item.getElementsByTag("p").text(); String url = item.getElementsByTag("h3").get(0).getElementsByTag("a").attr("href"); String ImageUrl = ""; if (item.getElementsByTag("p").get(0).getElementsByTag("a").size() >= 1) { ImageUrl = item.getElementsByTag("p").get(0).getElementsByTag("a").get(0).getElementsByTag("img").attr("src"); } String postInfo = item.getElementsByClass("post_item_foot").first().text(); //添加抓取数据到索引库 Blob blob = new Blob(); blob.setTitle(title); blob.setDescribe(describe); blob.setUrl(url); blob.setImageUrl(ImageUrl); blob.setPostInfo(postInfo); bulk.BulkAddBlob(blob); } } }
5.后台的数据检索功能实现
package com.ytdx.es; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import com.ytdx.entity.Blob; public class EsQuery { private Client client; private String index = "home"; private String type = "blob"; public EsQuery(Client client) { super(); this.client = client; } /** * 将查询后获得的response转成list * @param client * @param response * @return */ public List<Blob> responseToList(Client client, SearchResponse response) { SearchHits hits = response.getHits(); // List<Map<String, Object>> list = new ArrayList<Map<String, Object>>(); List<Blob> list = new ArrayList<Blob>(); for (int i = 0; i < hits.getHits().length; i++) { Map<String, Object> map = hits.getAt(i).getSourceAsMap(); Blob blob = new Blob(); if(map.containsKey("title")){ blob.setTitle(map.get("title").toString()); }else if(map.containsKey("describe")){ blob.setDescribe(map.get("describe").toString()); }else if(map.containsKey("url")){ blob.setUrl(map.get("url").toString()); }else if(map.containsKey("ImageUrl")){ blob.setImageUrl(map.get("ImageUrl").toString()); }else if(map.containsKey("postInfo")){ blob.setPostInfo(map.get("postInfo").toString()); } list.add(blob); } return list; } /** * 返回全部索引数据 * @return */ public List<Blob> getAll() { SearchResponse response = client.prepareSearch(index).setTypes(type) // 设置索引类型 .setQuery(QueryBuilders.matchAllQuery()) .setSize(30) .setScroll(TimeValue.timeValueMinutes(30)) .execute() .actionGet(); return responseToList(client, response); } /** * 对title和describe进行检索 * @param values * @return */ public List<Blob> getFilter(String values) { String filter1 = "title"; String filter2 = "describe"; HighlightBuilder hiBuilder = new HighlightBuilder(); hiBuilder.preTags("<span style=\"color:red\">"); //检索关键词高亮显示 hiBuilder.postTags("</span>"); hiBuilder.field(filter1); hiBuilder.preTags("<span style=\"color:red\">"); hiBuilder.postTags("</span>"); hiBuilder.field(filter2); SearchResponse response = client.prepareSearch(index).setTypes(type) .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) .setQuery(QueryBuilders .multiMatchQuery(values, filter1, filter2)) .setFrom(0).setSize(10) //数据分页显示 .setScroll(TimeValue.timeValueMinutes(30)) // 设置过期时间为30分钟 .setExplain(true) .highlighter(hiBuilder) .execute() .actionGet(); return responseToList(client, response); } }
6.最后用SSM接口映射到页面,就不写了.看看具体检索效果.搜索的数据达到6万多条数据检索,毫秒级返回,搜索速度可见非同一般.