当前位置 : 主页 > 编程语言 > python >

使用 Python 爬虫爬取网络

来源:互联网 收集:自由互联 发布时间:2022-06-15
Mechanize库浏览页面 #!/usr/bin/python #coding=utf-8 import mechanize def viewPage(url): browser = mechanize.Browser() page = browser.open(url) source_code = page.read() print source_code viewPage('http://www.imooc.com/') 使用代理服务

Mechanize库浏览页面

#!/usr/bin/python
#coding=utf-8
import mechanize

def viewPage(url):
browser = mechanize.Browser()
page = browser.open(url)
source_code = page.read()
print source_code

viewPage('http://www.imooc.com/')

使用代理服务器、User-Agent和cookie:

#!/usr/bin/python
#coding=utf-8
import mechanize

def testProxy(url, proxy):
browser = mechanize.Browser()
browser.set_proxies(proxy)
page = browser.open(url)
source_code = page.read()
print source_code

url = 'http://2017.ip138.com/ic.asp'
hideMeProxy = {'http': '139.196.202.164:9001'}
testProxy(url, hideMeProxy)#!/usr/bin/python
#coding=utf-8
import mechanize

def testUserAgent(url, userAgent):
browser = mechanize.Browser()
browser.addheaders = userAgent
page = browser.open(url)
source_code = page.read()
print source_code

url = 'http://whatismyuseragent.dotdoh.com/'
userAgent = [('User-agent', 'Mozilla/5.0 (X11; U; Linux 2.4.2-2 i586; en-US; m18) Gecko/20010131 Netscape6/6.01')]
testUserAgent(url, userAgent)

把代码集成在Python类的AnonBrowser中

#!/usr/bin/python
#coding=utf-8
import mechanize
import cookielib
import random

class anonBrowser(mechanize.Browser):
def __init__(self, proxies = [], user_agents = []):
mechanize.Browser.__init__(self)
self.set_handle_robots(False)
# 可供用户使用的代理服务器列表
self.proxies = proxies
# user_agent列表
self.user_agents = user_agents + ['Mozilla/4.0 ', 'FireFox/6.01','ExactSearch', 'Nokia7110/1.0']
self.cookie_jar = cookielib.LWPCookieJar()
self.set_cookiejar(self.cookie_jar)
self.anonymize()

# 清空cookie
def clear_cookies(self):
self.cookie_jar = cookielib.LWPCookieJar()
self.set_cookiejar(self.cookie_jar)

# 从user_agent列表中随机设置一个user_agent
def change_user_agent(self):
index = random.randrange(0, len(self.user_agents) )
self.addheaders = [('User-agent', ( self.user_agents[index] ))]

# 从代理列表中随机设置一个代理
def change_proxy(self):
if self.proxies:
index = random.randrange(0, len(self.proxies))
self.set_proxies( {'http': self.proxies[index]} )

# 调用上述三个函数改变UA、代理以及清空cookie以提高匿名性,其中sleep参数可让进程休眠以进一步提高匿名效果
def anonymize(self, sleep = False):
self.clear_cookies()
self.change_user_agent()
self.change_proxy()

if sleep:
time.sleep(60)

 测试每次是否使用不同的cookie访问:

#!/usr/bin/python
#coding=utf-8
from anonBrowser import *

ab = anonBrowser(proxies=[], user_agents=[('User-agent','superSecretBroswer')])

for attempt in range(1, 5):
# 每次访问都进行一次匿名操作
ab.anonymize()
print '[*] Fetching page'
response = ab.open('http://www.kittenwar.com/')
for cookie in ab.cookie_jar:
print cookie

用BeautifulSoup解析Href链接:

#!/usr/bin/python
#coding=utf-8
from anonBrowser import *
from BeautifulSoup import BeautifulSoup
import os
import optparse
import re

def printLinks(url):
ab = anonBrowser()
ab.anonymize()
page = ab.open(url)
html = page.read()
# 使用re模块解析href链接
try:
print '[+] Printing Links From Regex.'
link_finder = re.compile('href="(.*?)"')
links = link_finder.findall(html)
for link in links:
print link
except:
pass
# 使用bs4模块解析href链接
try:
print '\n[+] Printing Links From BeautifulSoup.'
soup = BeautifulSoup(html)
links = soup.findAll(name='a')
for link in links:
if link.has_key('href'):
print link['href']
except:
pass

def main():
parser = optparse.OptionParser('[*]Usage: python linkParser.py -u <target url>')
parser.add_option('-u', dest='tgtURL', type='string', help='specify target url')
(options, args) = parser.parse_args()
url = options.tgtURL

if url == None:
print parser.usage
exit(0)
else:
printLinks(url)

if __name__ == '__main__':
main()

用BeautifulSoup映射图像

#!/usr/bin/python
#coding=utf-8
from anonBrowser import *
from BeautifulSoup import BeautifulSoup
import os
import optparse

def mirrorImages(url, dir):
ab = anonBrowser()
ab.anonymize()
html = ab.open(url)
soup = BeautifulSoup(html)
image_tags = soup.findAll('img')

for image in image_tags:
# lstrip() 方法用于截掉字符串左边的空格或指定字符
filename = image['src'].lstrip('http://')
filename = os.path.join(dir, filename.replace('/', '_'))
print '[+] Saving ' + str(filename)
data = ab.open(image['src']).read()
# 回退
ab.back()
save = open(filename, 'wb')
save.write(data)
save.close()

def main():
parser = optparse.OptionParser('[*]Usage: python imageMirror.py -u <target url> -d <destination directory>')
parser.add_option('-u', dest='tgtURL', type='string', help='specify target url')
parser.add_option('-d', dest='dir', type='string', help='specify destination directory')
(options, args) = parser.parse_args()
url = options.tgtURL
dir = options.dir
if url == None or dir == None:
print parser.usage
exit(0)
else:
try:
mirrorImages(url, dir)
except Exception, e:
print '[-] Error Mirroring Images.'
print '[-] ' + str(e)

if __name__ == '__main__':
main()

用Python与谷歌API交互

#!/usr/bin/python
#coding=utf-8
import urllib
from anonBrowser import *

def google(search_term):
ab = anonBrowser()
# URL编码
search_term = urllib.quote_plus(search_term)
response = ab.open('https://www.googleapis.com/customsearch/v1?key=你的key&cx=你的id&num=1&alt=json&q=' + search_term)
print response.read()

google('Boundock Saint')

 接着就对Json格式的数据进行处理,添加json库的load()函数对Json数据进行加载即可

#!/usr/bin/python
#coding=utf-8
import urllib
from anonBrowser import *
import json

def google(search_term):
ab = anonBrowser()
# URL编码
search_term = urllib.quote_plus(search_term)
response = ab.open('https://www.googleapis.com/customsearch/v1?key=你的key&cx=你的id&num=1&alt=json&q=' + search_term)
objects = json.load(response)
print objects

google('Boundock Saint')

编写Google_Result类,用于保存Json数据解析下来的标题

#!/usr/bin/python
#coding=utf-8
import urllib
from anonBrowser import *
import json
import optparse

class Google_Result:
def __init__(self,title,text,url):
self.title = title
self.text = text
self.url = url

def __repr__(self):
return self.title

def google(search_term):
ab = anonBrowser()
# URL编码
search_term = urllib.quote_plus(search_term)
response = ab.open('https://www.googleapis.com/customsearch/v1?key=你的key&cx=你的id&num=1&alt=json&q=' + search_term)
objects = json.load(response)
results = []

for result in objects['items']:
url = result['link']
title = result['title']
text = result['snippet']
print url
print title
print text
new_gr = Google_Result(title, text, url)
results.append(new_gr)
return results

def main():
parser = optparse.OptionParser('[*]Usage: python anonGoogle.py -k <keywords>')
parser.add_option('-k', dest='keyword', type='string', help='specify google keyword')
(options, args) = parser.parse_args()
keyword = options.keyword

if options.keyword == None:
print parser.usage
exit(0)
else:
results = google(keyword)
print results

if __name__ == '__main__':
main()

用Python解析Tweets个人主页

#!/usr/bin/python
#coding=utf-8
import json
import urllib
from anonBrowser import *

class reconPerson:
def __init__(self, first_name, last_name, job='', social_media={}):
self.first_name = first_name
self.last_name = last_name
self.job = job
self.social_media = social_media

def __repr__(self):
return self.first_name + ' ' + self.last_name + ' has job ' + self.job

def get_social(self, media_name):
if self.social_media.has_key(media_name):
return self.social_media[media_name]
return None

def query_twitter(self, query):
query = urllib.quote_plus(query)
results = []
browser = anonBrowser()
response = browser.open('http://search.twitter.com/search.json?q=' + query)
json_objects = json.load(response)
for result in json_objects['results']:
new_result = {}
new_result['from_user'] = result['from_user_name']
new_result['geo'] = result['geo']
new_result['tweet'] = result['text']
results.append(new_result)
return results

ap = reconPerson('Boondock', 'Saint')
print ap.query_twitter('from:th3j35t3r since:2010-01-01 include:retweets')

从推文中提取地理位置信息

#!/usr/bin/python
#coding=utf-8
import json
import urllib
import optparse
from anonBrowser import *

def get_tweets(handle):
query = urllib.quote_plus('from:' + handle + ' since:2009-01-01 include:retweets')
tweets = []
browser = anonBrowser()
browser.anonymize()
response = browser.open('http://search.twitter.com/search.json?q='+ query)
json_objects = json.load(response)
for result in json_objects['results']:
new_result = {}
new_result['from_user'] = result['from_user_name']
new_result['geo'] = result['geo']
new_result['tweet'] = result['text']
tweets.append(new_result)
return tweets

def load_cities(cityFile):
cities = []
for line in open(cityFile).readlines():
city=line.strip('\n').strip('\r').lower()
cities.append(city)
return cities

def twitter_locate(tweets,cities):
locations = []
locCnt = 0
cityCnt = 0
tweetsText = ""

for tweet in tweets:
if tweet['geo'] != None:
locations.append(tweet['geo'])
locCnt += 1

tweetsText += tweet['tweet'].lower()

for city in cities:
if city in tweetsText:
locations.append(city)
cityCnt+=1

print "[+] Found " + str(locCnt) + " locations via Twitter API and " + str(cityCnt) + " locations from text search."
return locations

def main():
parser = optparse.OptionParser('[*]Usage: python twitterGeo.py -u <twitter handle> [-c <list of cities>]')
parser.add_option('-u', dest='handle', type='string', help='specify twitter handle')
parser.add_option('-c', dest='cityFile', type='string', help='specify file containing cities to search')
(options, args) = parser.parse_args()
handle = options.handle
cityFile = options.cityFile
if (handle==None):
print parser.usage
exit(0)
cities = []
if (cityFile!=None):
cities = load_cities(cityFile)
tweets = get_tweets(handle)
locations = twitter_locate(tweets,cities)
print "[+] Locations: "+str(locations)

if __name__ == '__main__':
main()

用正则表达式解析Twitter用户的兴趣爱好

#!/usr/bin/python
#coding=utf-8
import json
import re
import urllib
import urllib2
import optparse
from anonBrowser import *

def get_tweets(handle):
query = urllib.quote_plus('from:' + handle + ' since:2009-01-01 include:retweets')
tweets = []
browser = anonBrowser()
browser.anonymize()
response = browser.open('http://search.twitter.com/search.json?q='+ query)
json_objects = json.load(response)
for result in json_objects['results']:
new_result = {}
new_result['from_user'] = result['from_user_name']
new_result['geo'] = result['geo']
new_result['tweet'] = result['text']
tweets.append(new_result)
return tweets

def find_interests(tweets):
interests = {}
interests['links'] = []
interests['users'] = []
interests['hashtags'] = []

for tweet in tweets:
text = tweet['tweet']
links = re.compile('(http.*?)\Z|(http.*?) ').findall(text)

for link in links:
if link[0]:
link = link[0]
elif link[1]:
link = link[1]
else:
continue

try:
response = urllib2.urlopen(link)
full_link = response.url
interests['links'].append(full_link)
except:
pass
interests['users'] += re.compile('(@\w+)').findall(text)
interests['hashtags'] += re.compile('(#\w+)').findall(text)

interests['users'].sort()
interests['hashtags'].sort()
interests['links'].sort()

return interests

def main():
parser = optparse.OptionParser('[*]Usage: python twitterInterests.py -u <twitter handle>')
parser.add_option('-u', dest='handle', type='string', help='specify twitter handle')
(options, args) = parser.parse_args()
handle = options.handle
if handle == None:
print parser.usage
exit(0)

tweets = get_tweets(handle)
interests = find_interests(tweets)
print '\n[+] Links.'
for link in set(interests['links']):
print ' [+] ' + str(link)

print '\n[+] Users.'
for user in set(interests['users']):
print ' [+] ' + str(user)

print '\n[+] HashTags.'
for hashtag in set(interests['hashtags']):
print ' [+] ' + str(hashtag)

if __name__ == '__main__':
main()

编写reconPerson类,封装所有抓取的地理位置、兴趣爱好以及Twitter页面的代码:

#!/usr/bin/python
#coding=utf-8
import urllib
from anonBrowser import *
import json
import re
import urllib2

class reconPerson:
def __init__(self, handle):
self.handle = handle
self.tweets = self.get_tweets()

def get_tweets(self):
query = urllib.quote_plus('from:' + self.handle + ' since:2009-01-01 include:retweets')
tweets = []
browser = anonBrowser()
browser.anonymize()
response = browser.open('http://search.twitter.com/search.json?q=' + query)
json_objects = json.load(response)
for result in json_objects['results']:
new_result = {}
new_result['from_user'] = result['from_user_name']
new_result['geo'] = result['geo']
new_result['tweet'] = result['text']
tweets.append(new_result)
return tweets

def find_interests(self):
interests = {}
interests['links'] = []
interests['users'] = []
interests['hashtags'] = []

for tweet in self.tweets:
text = tweet['tweet']
links = re.compile('(http.*?)\Z|(http.*?) ').findall(text)

for link in links:
if link[0]:
link = link[0]
elif link[1]:
link = link[1]
else:
continue
try:
response = urllib2.urlopen(link)
full_link = response.url
interests['links'].append(full_link)
except:
pass
interests['users'] += re.compile('(@\w+)').findall(text)
interests['hashtags'] += re.compile('(#\w+)').findall(text)

interests['users'].sort()
interests['hashtags'].sort()
interests['links'].sort()
return interests

def twitter_locate(self, cityFile):
cities = []
if cityFile != None:
for line in open(cityFile).readlines():
city = line.strip('\n').strip('\r').lower()
cities.append(city)

locations = []
locCnt = 0
cityCnt = 0
tweetsText = ''

for tweet in self.tweets:
if tweet['geo'] != None:
locations.append(tweet['geo'])
locCnt += 1
tweetsText += tweet['tweet'].lower()

for city in cities:
if city in tweetsText:
locations.append(city)
cityCnt += 1

return locations

使用Smtplib给目标对象发邮件

#!/usr/bin/python
#coding=utf-8
import smtplib
from email.mime.text import MIMEText

def sendMail(user, pwd, to, subject, text):
msg = MIMEText(text)
msg['From'] = user
msg['To'] = to
msg['Subject'] = subject
try:
smtpServer = smtplib.SMTP('smtp.gmail.com', 587)
print "[+] Connecting To Mail Server."
smtpServer.ehlo()
print "[+] Starting Encrypted Session."
smtpServer.starttls()
smtpServer.ehlo()
print "[+] Logging Into Mail Server."
smtpServer.login(user, pwd)
print "[+] Sending Mail."
smtpServer.sendmail(user, to, msg.as_string())
smtpServer.close()
print "[+] Mail Sent Successfully."
except:
print "[-] Sending Mail Failed."

user = 'username'
pwd = 'password'
sendMail(user, pwd, 'target@tgt.tgt', 'Re: Important', 'Test Message')

用smtplib进行网络钓鱼

#!/usr/bin/python
#coding=utf-8
import smtplib
import optparse
from email.mime.text import MIMEText
from twitterClass import *
from random import choice

def sendMail(user, pwd, to, subject, text):
msg = MIMEText(text)
msg['From'] = user
msg['To'] = to
msg['Subject'] = subject
try:
smtpServer = smtplib.SMTP('smtp.gmail.com', 587)
print "[+] Connecting To Mail Server."
smtpServer.ehlo()
print "[+] Starting Encrypted Session."
smtpServer.starttls()
smtpServer.ehlo()
print "[+] Logging Into Mail Server."
smtpServer.login(user, pwd)
print "[+] Sending Mail."
smtpServer.sendmail(user, to, msg.as_string())
smtpServer.close()
print "[+] Mail Sent Successfully."
except:
print "[-] Sending Mail Failed."

def main():
parser = optparse.OptionParser('[*]Usage: python sendSam.py -u <twitter target> -t <target email> ' + '-l <gmail login> -p <gmail password>')
parser.add_option('-u', dest='handle', type='string', help='specify twitter handle')
parser.add_option('-t', dest='tgt', type='string', help='specify target email')
parser.add_option('-l', dest='user', type='string', help='specify gmail login')
parser.add_option('-p', dest='pwd', type='string', help='specify gmail password')
(options, args) = parser.parse_args()
handle = options.handle
tgt = options.tgt
user = options.user
pwd = options.pwd
if handle == None or tgt == None or user ==None or pwd==None:
print parser.usage
exit(0)

print "[+] Fetching tweets from: " + str(handle)
spamTgt = reconPerson(handle)
spamTgt.get_tweets()
print "[+] Fetching interests from: " + str(handle)
interests = spamTgt.find_interests()
print "[+] Fetching location information from: " + str(handle)
location = spamTgt.twitter_locate('mlb-cities.txt')

spamMsg = "Dear " + tgt + ","

if (location != None):
randLoc = choice(location)
spamMsg += " Its me from " + randLoc + "."

if (interests['users'] != None):
randUser = choice(interests['users'])
spamMsg += " " + randUser + " said to say hello."

if (interests['hashtags'] != None):
randHash=choice(interests['hashtags'])
spamMsg += " Did you see all the fuss about " + randHash + "?"

if (interests['links']!=None):
randLink=choice(interests['links'])
spamMsg += " I really liked your link to: " + randLink + "."

spamMsg += " Check out my link to http://evil.tgt/malware"
print "[+] Sending Msg: " + spamMsg

sendMail(user, pwd, tgt, 'Re: Important', spamMsg)

if __name__ == '__main__':
main()

版权声明:本博客文章与代码均为学习时整理的笔记,文章 [均为原创] 作品,转载请 [添加出处] ,您添加出处是我创作的动力!





上一篇:使用 Python 分析网络流量
下一篇:没有了
网友评论