什麼是分佈式爬蟲
程式一般是單 process 跑的,指令也是一條條處理的,每執行完一條指令才能跳到下一條,如果我們能合理利用計算資源,在 下載一部分網頁的時候就已經開始分析另一部分網頁了,這將會大大節省整個程序的運行時間;又或者我們能 同時下載多個網頁,同時分析多個網頁。multiprocessing 應用在需要爬蟲的工作越多的時候效果較好(也就是 Pool 裡面的任務較多時)。
- multiprocessing 是使用電腦多核系統來並行,cpu 有多少內核便可以同時執行多少個 process。
其實 mult-process 相對來說不是特別適合用來做爬蟲,因為 mult-process 比較適用於計算密集型,而爬蟲是 IO 密集型,因此 mult-process 爬蟲對速度的提升不是特別明顯。
python multiprocessing
- 因 GIL (CPython) 緣故,multithread 需用 multiprocess 取代。
- window 程式需在
if __name__ == '__main__':
之內運行不然會有 RuntimeError,因 window 沒有 fork,所以執行時是採用 spawn,使用 runpy 實現,所以每個 child process 會在一開始 import 原先的 module 再執行對應的 function。
process
#!/usr/bin/env python
import time
import multiprocessing
def do(n) :
# Obtain the name of current process
name = multiprocessing.current_process().name
print name,'starting'
print "worker ", n
return
if __name__ == '__main__' :
numList = []
for i in xrange(5) :
p = multiprocessing.Process(target=do, args=(i,)) #建立子process
numList.append(p)
p.start() #啟動子process
p.join(timeout=0.1) #等待子process結束以後再繼續往下運行,等待0.1s,可為無參數,則表示永久等待
print "Process end."
>>>
Process-1 starting
worker 0
Process end.
Process-2 starting
worker 1
Process end.
Process-3 starting
worker 2
Process end.
Process-4 starting
worker 3
Process end.
Process-5 starting
worker 4
Process end.
Pool
- 同時操作多個文件目錄或者遠程控制多台主機,並行操作可以節約大量的時間。
- 使用於任務較多時,Pool 可以提供指定數量的 process 供用戶調用,當有新的請求提交到 Pool 中時,如果 pool 還沒有滿,就會創建一個新的 process 來執行請求。
- 如果 pool 滿,請求就會告知先等待,直到 pool 中有 process 結束,才會創建新的 process 來執行這些請求。
# coding: utf-8
from multiprocessing import Pool
import time
def task(msg):
print 'hello, %s' % msg
time.sleep(1)
return 'msg: %s' % msg
if __name__ == '__main__':
pool = Pool(processes=4) #Create 4 process
results = []
msgs = [x for x in range(10)]
results = pool.map(task, msgs) #使process阻塞直到返回結果
pool.close() #關閉pool使其不在接受新的任務。
pool.join() #主process阻塞等待子process的退出,必須在close之後使用
print 'processes done, result:'
for x in results: #獲取返回值
print x
Lock
- 當多個 process 需要訪問共享資源的時候,Lock 可以用來避免訪問的衝突。
# coding: utf-8
from multiprocessing import Lock, Process
import time
def task1(lock, f):
with lock:
f = open(f, 'w+')
f.write('hello ')
time.sleep(1)
f.close()
def task2(lock, f):
lock.acquire()
try:
f = open(f, 'a+')
time.sleep(1)
f.write('world!')
except Exception as e:
print(e)
finally:
f.close()
lock.release()
if __name__ == '__main__':
lock = Lock()
fn = './file.txt'
start = time.time()
p1 = Process(target=task1, args=(lock, fn,))
p2 = Process(target=task2, args=(lock, fn,))
p1.start()
p2.start()
p1.join()
p2.join()
end = time.time()
print 'time cost: %s seconds' % (end - start)
with open(fn, 'r') as f:
for x in f.readlines():
print x
time cost: 2.0059568882 seconds
hello world!
共享資源
mult-process 資源不共享,如果需要共享資源可以使用 Queue,Array,Manager(普通的全局變量是不能被子 process 所共享的,只有通過 Multiprocessing 組件構造的數據結構可以被共享)。
Queue
共享資源( share memory ),只適用 Process,不能再 Pool 中使用。
from multiprocessing import Process, Queue def test(queue): queue.put("Hello World") if __name__ == '__main__': q = Queue() p = Process(target=test, args=(q,)) #需要將q對象傳遞給子Process p.start() print q.get()
Array
共享資源( share memory ),只適用於 Process。
from multiprocessing import Process, Array def test(a): for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': arr = Array('i', range(10)) p = Process(target=test, args=(arr)) #需要將arr對象傳遞給子Process p.start() p.join() print arr[:]
Manager
可以適用 Pool。
from multiprocessing import Manager def test(i,lists): print i lists.append(i) if __name__=="__main__": pool=Pool() lists=Manager().list() #Manager實例化只能寫在main()裏面 for i in xrange(10000000): if len(lists)<=0: ''' 在創建子Process時,需要將lists對象傳入,不然無法共享。 ''' pool.apply_async(test,args=(i,lists)) #需要將lists對象傳遞給子process,這裏比較耗資源,原因可能是因為Manager是基於通信的 else: break
應用 process 爬蟲
import multiprocessing
import time
from urllib.request import urlopen, urljoin
from bs4 import BeautifulSoup
import re
# base_url = "http://127.0.0.1:4000/"
base_url = 'https://morvanzhou.github.io/'
def crawl(url): #--爬取網頁--
response = urlopen(url) #打開網頁
# time.sleep(0.1) #添加了一個下載延遲的時間當範例
return response.read().decode()
def parse(html): #--解析網頁--
soup = BeautifulSoup(html, 'lxml') #用BeautifulSoup返回找到的信息
urls = soup.find_all('a', {"href": re.compile('^/.+?/$')})
title = soup.find('h1').get_text().strip()
page_urls = set([urljoin(base_url, url['href']) for url in urls])
url = soup.find('meta', {'property': "og:url"})['content']
return title, page_urls, url
#定義兩個set,用來蒐集爬過的網頁和沒爬過的
unseen = set([base_url,])
seen = set()
process
:消耗 52.3 秒(慢)if __name__ == '__main__': if base_url != "http://127.0.0.1:4000/": restricted_crawl = True #限制爬取數量,預防頻繁抓取 else: restricted_crawl = False while len(unseen) != 0: if restricted_crawl and len(seen) >= 20: break htmls = [crawl(url) for url in unseen] #爬取網頁 results = [parse(html) for html in htmls] #解析網頁 seen.update(unseen) unseen.clear() for title, page_urls, url in results: unseen.update(page_urls - seen)
multiprocessing
分佈式爬法:消耗 16.3 秒(快)if __name__ == '__main__': pool = multiprocessing.Pool(4) while len(unseen) != 0: crawl_jobs = [pool.apply_async(crawl, args=(url,)) for url in unseen] #爬取網頁pool htmls = [j.get() for j in crawl_jobs] parse_jobs = [pool.apply_async(parse, args=(html,)) for html in htmls] #解析網頁pool results = [j.get() for j in parse_jobs]
應用 mult-process pool 爬蟲 [1]
#coding:utf-8
import multiprocessing from bs4
import BeautifulSoup
import requests
def pageUrls(url):
web_data = requests.get(url)
soup = BeautifulSoup(web_data.text, 'lxml')
sum = int(soup.select('span.total > em:nth-of-type(1)')[0].get_text())
pageNum = sum/50
return [url+'/loupan/s?p={}'.format(str(i)) for i in range(1, pageNum+2, 1)]
def detailPage(myurl):
urls = pageUrls(myurl)
for url in urls:
web_data = requests.get(url)
soup = BeautifulSoup(web_data.text, 'lxml')
titles = soup.select('div.list-results > div.key-list > div > div.infos > div > h3 > a')
for title in titles:
print url
print title.get_text()
print title.get('href')
def main(urls):
pool = multiprocessing.Pool(multiprocessing.cpu_count())
for url in urls:
pool.apply_async(detailPage, (url, ))
# pool.map(detailPage, urls)
pool.close()
pool.join()
if __name__ == "__main__":
startUrl = 'http://tj.fang.anjuke.com/?from=navigation'
web_data = requests.get(startUrl)
soup = BeautifulSoup(web_data.text, 'lxml')
urls = [url.get('href') for url in soup.select('.city-mod > dl > dd > a')]
main(urls)
pool = multiprocessing.Pool(multiprocessing.cpu_count())
:cpu 有多少內核便可以同時執行多少個 process,()可以為空,預設 cpu 最大核心數。pool.apply_async(detailPage, (url, ))
:從 pool 中取出一個 process 執行 func,args 為 func 的參數。pool.close()
:關閉 pool,pool 不會再創建新的 process。pool.join()
:用來等待 pool 中的 worker process 執行完畢,防止主 process 在 worker process 結束前結束。
ref:
- 加速爬蟲: 多進程分佈式
- python爬蟲的最佳實踐(六)–爬蟲中的多進程
- Python 文章收集 multiprocessing 模塊介紹
- Python 多进程 multiprocessing.Pool类详解
- python多進程multiprocessing共享資源