[Python爬蟲] 07. multiprocessing 多執行續爬蟲

 · 11 mins read

什麼是分佈式爬蟲

程式一般是單 process 跑的,指令也是一條條處理的,每執行完一條指令才能跳到下一條,如果我們能合理利用計算資源,在 下載一部分網頁的時候就已經開始分析另一部分網頁了,這將會大大節省整個程序的運行時間;又或者我們能 同時下載多個網頁,同時分析多個網頁。multiprocessing 應用在需要爬蟲的工作越多的時候效果較好(也就是 Pool 裡面的任務較多時)。

  • multiprocessing 是使用電腦多核系統來並行,cpu 有多少內核便可以同時執行多少個 process。
  • 其實 mult-process 相對來說不是特別適合用來做爬蟲,因為 mult-process 比較適用於計算密集型,而爬蟲是 IO 密集型,因此 mult-process 爬蟲對速度的提升不是特別明顯。



python multiprocessing

  • 因 GIL (CPython) 緣故,multithread 需用 multiprocess 取代。
  • window 程式需在 if __name__ == '__main__': 之內運行不然會有 RuntimeError,因 window 沒有 fork,所以執行時是採用 spawn,使用 runpy 實現,所以每個 child process 會在一開始 import 原先的 module 再執行對應的 function。

process

  #!/usr/bin/env python  
  import time
  import multiprocessing  
    
  def do(n) :  
    # Obtain the name of current process  
    name = multiprocessing.current_process().name  
    print name,'starting'  
    print "worker ", n  
    return  
    
  if __name__ == '__main__' :  
    numList = []  
    for i in xrange(5) :  
      p = multiprocessing.Process(target=do, args=(i,))      #建立子process
      numList.append(p)  
      p.start()                                              #啟動子process
      p.join(timeout=0.1)                                    #等待子process結束以後再繼續往下運行,等待0.1s,可為無參數,則表示永久等待
      print "Process end."  
  >>>
  Process-1 starting
  worker 0
  Process end.
  Process-2 starting
  worker 1
  Process end.
  Process-3 starting
  worker 2
  Process end.
  Process-4 starting
  worker 3
  Process end.
  Process-5 starting
  worker 4
  Process end.



Pool

  • 同時操作多個文件目錄或者遠程控制多台主機,並行操作可以節約大量的時間。
  • 使用於任務較多時,Pool 可以提供指定數量的 process 供用戶調用,當有新的請求提交到 Pool 中時,如果 pool 還沒有滿,就會創建一個新的 process 來執行請求。
  • 如果 pool 滿,請求就會告知先等待,直到 pool 中有 process 結束,才會創建新的 process 來執行這些請求。
  # coding: utf-8
  from multiprocessing import Pool
  import time
  
  def task(msg):
    print 'hello, %s' % msg
    time.sleep(1)
    return 'msg: %s' % msg
  
  if __name__ == '__main__':
    pool = Pool(processes=4)         #Create 4 process
  
    results = []
    msgs = [x for x in range(10)]
    results = pool.map(task, msgs)   #使process阻塞直到返回結果
  
    pool.close()                     #關閉pool使其不在接受新的任務。 
    pool.join()                      #主process阻塞等待子process的退出,必須在close之後使用
  
    print 'processes done, result:'
  
    for x in results:                #獲取返回值
      print x



Lock

  • 當多個 process 需要訪問共享資源的時候,Lock 可以用來避免訪問的衝突。
  # coding: utf-8
  from multiprocessing import Lock, Process
  import time
  
  def task1(lock, f):
    with lock:
      f = open(f, 'w+')
      f.write('hello ')
      time.sleep(1)
      f.close()
  
  def task2(lock, f):
    lock.acquire()
    try:
      f = open(f, 'a+')
      time.sleep(1)
      f.write('world!')
    except Exception as e:
      print(e)
    finally:
      f.close()
      lock.release()
  
  if __name__ == '__main__':
    lock = Lock()
    fn = './file.txt'
  
    start = time.time()
    p1 = Process(target=task1, args=(lock, fn,))
    p2 = Process(target=task2, args=(lock, fn,))
  
    p1.start()
    p2.start()
  
    p1.join()
    p2.join()
  
    end = time.time()
    print 'time cost: %s seconds' % (end - start)
  
    with open(fn, 'r') as f:
      for x in f.readlines():
        print x
  time cost: 2.0059568882 seconds
  hello world!



共享資源

mult-process 資源不共享,如果需要共享資源可以使用 Queue,Array,Manager(普通的全局變量是不能被子 process 所共享的,只有通過 Multiprocessing 組件構造的數據結構可以被共享)。

Queue

  • 共享資源( share memory ),只適用 Process,不能再 Pool 中使用。

      from multiprocessing import Process, Queue  
      def test(queue):  
          queue.put("Hello World")  
      if __name__ == '__main__':
          q = Queue()  
          p = Process(target=test, args=(q,))  #需要將q對象傳遞給子Process
          p.start()
          print q.get()
    

Array

  • 共享資源( share memory ),只適用於 Process。

      from multiprocessing import Process, Array
      def test(a):
          for i in range(len(a)):
              a[i] = -a[i]
      if __name__ == '__main__':
          arr = Array('i', range(10))
          p = Process(target=test, args=(arr))  #需要將arr對象傳遞給子Process
          p.start()
          p.join()
          print arr[:]
    

Manager

  • 可以適用 Pool。

      from multiprocessing import Manager
    
      def test(i,lists):
          print i
          lists.append(i)
    
      if __name__=="__main__":
          pool=Pool()
          lists=Manager().list()                          #Manager實例化只能寫在main()裏面
          for i in xrange(10000000):
              if len(lists)<=0:
              '''
              在創建子Process時,需要將lists對象傳入,不然無法共享。
              '''
                  pool.apply_async(test,args=(i,lists))   #需要將lists對象傳遞給子process,這裏比較耗資源,原因可能是因為Manager是基於通信的
              else:
                  break
    



應用 process 爬蟲

  import multiprocessing
  import time
  from urllib.request import urlopen, urljoin
  from bs4 import BeautifulSoup
  import re
  
  # base_url = "http://127.0.0.1:4000/"
  base_url = 'https://morvanzhou.github.io/'
  def crawl(url):  #--爬取網頁--
      response = urlopen(url)                                         #打開網頁
      # time.sleep(0.1)                                               #添加了一個下載延遲的時間當範例
    return response.read().decode()

  def parse(html):  #--解析網頁--
      soup = BeautifulSoup(html, 'lxml')                              #用BeautifulSoup返回找到的信息
      urls = soup.find_all('a', {"href": re.compile('^/.+?/$')})
      title = soup.find('h1').get_text().strip()
      page_urls = set([urljoin(base_url, url['href']) for url in urls])
      url = soup.find('meta', {'property': "og:url"})['content']
      return title, page_urls, url

  #定義兩個set,用來蒐集爬過的網頁和沒爬過的
  unseen = set([base_url,])
  seen = set()
  • process:消耗 52.3 秒(慢)

      if __name__ == '__main__':
          if base_url != "http://127.0.0.1:4000/":
              restricted_crawl = True                                      #限制爬取數量,預防頻繁抓取
          else:
              restricted_crawl = False
            
          while len(unseen) != 0:
              if restricted_crawl and len(seen) >= 20:
                  break
              htmls = [crawl(url) for url in unseen]                       #爬取網頁
              results = [parse(html) for html in htmls]                    #解析網頁
        
              seen.update(unseen)
              unseen.clear()
            
              for title, page_urls, url in results:
                  unseen.update(page_urls - seen)
    
  • multiprocessing 分佈式爬法:消耗 16.3 秒(快)

      if __name__ == '__main__':
          pool = multiprocessing.Pool(4)
          while len(unseen) != 0:
              crawl_jobs = [pool.apply_async(crawl, args=(url,)) for url in unseen]  #爬取網頁pool
              htmls = [j.get() for j in crawl_jobs]
            
              parse_jobs = [pool.apply_async(parse, args=(html,)) for html in htmls]  #解析網頁pool
              results = [j.get() for j in parse_jobs]
    



應用 mult-process pool 爬蟲 [1]

#coding:utf-8 
import multiprocessing from bs4 
import BeautifulSoup 
import requests  

def pageUrls(url): 
  web_data = requests.get(url) 
  soup = BeautifulSoup(web_data.text, 'lxml') 
  sum = int(soup.select('span.total > em:nth-of-type(1)')[0].get_text()) 
  pageNum = sum/50 
  return [url+'/loupan/s?p={}'.format(str(i)) for i in range(1, pageNum+2, 1)]  

def detailPage(myurl): 
  urls = pageUrls(myurl) 
  for url in urls: 
    web_data = requests.get(url) 
    soup = BeautifulSoup(web_data.text, 'lxml') 
    titles = soup.select('div.list-results > div.key-list > div > div.infos > div > h3 > a') 
    for title in titles: 
      print url 
      print title.get_text() 
      print title.get('href')  

def main(urls): 
  pool = multiprocessing.Pool(multiprocessing.cpu_count()) 
  for url in urls: 
    pool.apply_async(detailPage, (url, )) 
  # pool.map(detailPage, urls) 
  pool.close() 
  pool.join()   

if __name__ == "__main__": 
  startUrl = 'http://tj.fang.anjuke.com/?from=navigation' 
  web_data = requests.get(startUrl) 
  soup = BeautifulSoup(web_data.text, 'lxml') 
  urls = [url.get('href') for url in soup.select('.city-mod > dl > dd > a')] 
  main(urls)
  • pool = multiprocessing.Pool(multiprocessing.cpu_count()):cpu 有多少內核便可以同時執行多少個 process,()可以為空,預設 cpu 最大核心數。
  • pool.apply_async(detailPage, (url, )):從 pool 中取出一個 process 執行 func,args 為 func 的參數。
  • pool.close():關閉 pool,pool 不會再創建新的 process。
  • pool.join():用來等待 pool 中的 worker process 執行完畢,防止主 process 在 worker process 結束前結束。



ref: