Medium 爬蟲進化史


Posted by huli on 2019-07-13

前言

前幾天的時候我在 Medium 上發了這篇文:Medium 中文寫作者追蹤人數排名與不專業數據分析,內文是我用 Node.js 寫了一個簡單的 Medium 爬蟲之後整理出來的數據。

在原本那篇文章裡面有簡單提到爬蟲的資料來源,但是對技術的部分沒有太多著墨。事實上,在寫 Medium 爬蟲的時候其實踩了一些坑,與其教大家寫一個 Medium 爬蟲,不如讓大家跟我一起走過這些坑,盡可能地還原我當初在寫這個爬蟲時碰到的障礙以及解決方法,我覺得這樣會更有趣一點。

因此,這篇就是用來記錄我寫這個 Medium 爬蟲的經過,其中也會有點教學的成份在,所以看完之後你應該也能夠寫出一個類似的爬蟲,或至少你看到 source code 的時候不會一頭霧水。

雖然說最後寫出來的是這個跟使用者資料有關的爬蟲,但我一開始其實是先從文章列表開始的,因為那時候剛好有一個需求,想要把自己的文章全部爬下來。

會有這個需求是因為 Medium 內建的功能其實滿爛的,你很難找到一個作者 po 過的所有文章,或者是說很難一目瞭然。所以早期的文章除了透過 Google 以外,是很難被找到的。

所以我後來就手動做了一個文章的索引,自己整理了以前發過的所有文章。但是身為工程師,這明明就是一件可以寫程式來做的事啊!所以想嘗試看看能不能先寫一個文章列表的爬蟲。

第一次嘗試:尋找資料來源

對我來說,爬蟲的第一步也是最困難的一步就是找到資料來源。只要這一步完成了,其他的相比之下都比較簡單。

如果能拿到 Medium 的 API 那當然是最好的。若是沒有的話,就必須用 puppeteer 之類的東西去爬 HTML 然後自己 parse 了。

在 Medium 的文章列表那邊捲動一下並且打開 devtool,可以看到 medium 後面是用 GraphQL:

這個就麻煩了...我對 GraphQL 不太熟,要花時間去研究一下它的資料結構,感覺要花不少時間,於是那時我就暫時先放棄這條路,決定來試試看用 puppeteer。

第二次嘗試:puppeteer

如果你不知道什麼是 puppeteer,我在這邊簡單介紹一下。你可以想成 puppeteer 會自動幫你打開一個瀏覽器,你可以寫程式去操控這個瀏覽器。例如說我要打開一個頁面並且在這頁面上執行 JS 等等,所以使用 puppeteer 的話,爬蟲的原理就是打開某個頁面,執行一段 JS 拿到頁面上的資料。

puppeteer 用起來很簡單,只要找一下現成的範例看一下語法,改一改就可以直接拿來用了。稍微研究了一下 HTML 結構之後,可以寫出下面的程式碼:

const puppeteer = require('puppeteer')

async function main() {
  const username = 'hulitw'
  const url = 'https://medium.com/@' + username

  const browser = await puppeteer.launch({
    headless: true
  })

  // 造訪頁面
  const page = await browser.newPage()
  await page.goto(url, {
    waitUntil: 'domcontentloaded'
  })

  // 執行準備好的 script 並回傳資料 
  const data = await page.evaluate(mediumParser)
  console.log(data)

  await page.close()
  await browser.close()
}

function mediumParser() {

  // selector 是透過觀察而得來的
  const elements = document.querySelectorAll('section > div:nth-child(2) > div > div')
  const result = []
  for (let i = 0; i < elements.length; i++) {
    const h1 = elements[i].querySelector('h1')
    const a = elements[i].querySelectorAll('a')
    if (h1) {
      result.push({
        title: h1.innerText,
        link: a[3].href
      })
    }
  }
  return result
}

main()

只要觀察出 HTML 與 CSS 的規則之後,就可以取得想拿的資料。但 Medium 不好爬是因為在 class name 的部分有使用 functional CSS,而且 class 的命名都有經過處理,看起來是用程式自動去跑的,所以只要 Medium 一更新,元素的命名應該會不太一樣。

所以最後只能從 HTML 的結構下手,去把文章給抓出來。

解決了這個問題之後,還有一個問題,那就是無限捲動。Medium 跟很多網頁一樣,要一直往下滑才會載入新文章,而這邊必須觀察的規律是滑到什麼時候才要停止。

觀察之後發現當發表過的文章載入完以後,才會顯示 Highlighted by xxx 這個區塊,所以可以用這個元素有沒有出現當作終止條件。

接著可以寫一段程式碼,讓頁面不斷往下捲動直到載入所有文章為止:

/*
  要用的話就是: await scroll(page)
*/

function getArticlesCount() {
  const elements = document.querySelectorAll('section > div:nth-child(2) > div > div')
  return elements.length
}

async function scroll(page) {
  await page.evaluate('window.scrollTo(0, document.body.scrollHeight)')
  try {

    // 終止條件
    await page.waitForSelector('h4 ~ p', {
      timeout: 1000
    })
  } catch(err) {

    // 印出目前抓到的文章數目
    const count = await page.evaluate(getArticlesCount);
    console.log(`Fetching... ${count} articles`)

    // 繼續往下捲動
    await scroll(page);
  }
}

為了在 console 上讓我能看到現在的進度(可以確認程式是不是有 bug),還加了一段是每一次捲動都會印出現在畫面上有的文章數量。

做到這邊,就可以抓到使用者所有的文章標題以及連結了。

那發文日期呢?也拿得到嗎?

拿得到,但是麻煩很多。看看下面的 Medium 截圖就知道了:

如果是今年(2019)的文章,就不會顯示年份,否則的話就會顯示出發文年份。所以這邊要再經過特殊的判斷處理,而且只拿得到日期,拿不到詳細發文時間。

做到這邊,我就懶得再繼續下去了。想說有很多眉眉角角要處理,而且抓到的資料有限,還不如轉去研究 API 比較實在。

第三次嘗試:puppeteer + API

前面已經說過我那時對 GraphQL API 不熟,所以暫時放棄了。但是嘗試了 puppeteer 之後,反而讓我有了新的思路。

在 puppeteer 裡面你可以加上監聽 network response 的事件,而頁面在載入文章的時候,一定會呼叫 API 去拿文章。這樣子事情不就好辦多了嗎?我不用自己研究怎麼 call API,我讓頁面自己去 call API,我自己只要監聽 response,研究一下 response 的格式就行了!

程式碼大概是長這樣的:

const apiResponses = []

page.on('response', async (response) => {
    if (response._url.indexOf('graphql') < 0) return
    const json = await response.json()
    try {
      const post = parsePosts(json)
      apiResponses.push(...post)
    } catch (err) {
    }
})

function parsePosts(json) {
  const result = []
  try {

    // 研究到一半沒做完
    const streams = json.data.user.profileStreamConnection.stream
    for (stream of streams) {
      if (stream.itemType.__typename === 'StreamItemCompressedPostList') {

      }
    }
  } catch (err) {

  }
}

每次有新的 response 進來就可以解析一下並丟到 array 裡面,最後拿到的就會是完整的從 API 傳來的資料。

但後來我發現這條路也行不通。

為什麼呢?因為頁面在第一次載入的時候,從 Server 回傳的 HTML 就已經有前幾筆文章的資料了,往下捲動的時候才是使用 ajax 來載入新的文章。意思是說,如果我想靠監聽 ajax response 的方式拿到所有文章的資料是沒辦法的,前幾筆是拿不到的。

做到這邊的時候我有點心灰意冷,想說花了兩天寫出一個不能用的東西。抓取文章列表的部分做到這我就放棄了,懶得繼續花時間去研究,並且把心力轉向我真正想抓的東西。

最前面提到的抓文章列表的需求其實是突然蹦出來的,在這之前我有更想抓的東西:follower,我想統計臺灣寫作者的 follower 人數,然後看看自己可以排到第幾名(滿足一下虛榮心)。

在嘗試了抓文章列表並失敗以後,我有試過用類似的方式去抓 follower,但做到一半發現這樣抓的話效率也太差了,每次捲動載入 25 個 follower 的話,1000 人可是要捲動 40 次。

自己如果做不出來的話,答案就很明顯了:Google,就拜託你了!

第四次嘗試:Google 大神

直接在 Google 打上關鍵字:medium follower api,出現的第一個搜尋結果是最無用的官方 API,幾乎什麼資料都沒給,而且要申請還要寄信給客服,有夠麻煩。

但是第二個搜尋結果讓我眼睛為之一亮,是一個 gist 檔案:Medium API: get number of followers for User · GitHub

程式碼才五十行而已,很短,掃過一遍可以看到最關鍵的一行:

// BUILD THE URL TO REQUEST FROM
function generateMediumProfileUri(username) {
  return `https://medium.com/@${username}?format=json`;
}

什麼!原來還有這招,在網址後面加 ?format=json 就可以拿到 json 格式的資料,這真是太神奇了。

把得到的資料丟到 JSON Formatter 之後,可以看到大概的結構:

在這邊可以拿到使用者的個人資料以及發過的一些文章,也可以拿到我們的目標:follower!

我們順便來看一下使用者資料可以拿到些什麼:

"user":{  
   "userId":"f1fb3e40dc37",
   "name":"Huli",
   "username":"hulitw",
   "createdAt":1487549030919,
   "imageId":"1*WQyJUJBQpBNIHH8GEWE6Sg.jpeg",
   "backgroundImageId":"",
   "bio":"自學程式,後來跑去唸哲學系但沒念完,前往新加坡工作了兩年半後決定放一年的假,到處旅遊。喜歡教學,發現自己好像有把事情講得簡單又清楚的能力,相信分享與交流可以讓世界更美好。\bMedium 文章列表請參考:https://aszx87410.github.io/blog/medium",
   "allowNotes":1,
   "mediumMemberAt":1542441600000,
   "isNsfw":false,
   "isWriterProgramEnrolled":true,
   "isQuarantined":false,
   "type":"User"
}

除了基本的自介跟姓名以外,還可以拿到成為 Medium 付費會員的時間以及成為 Medium 會員的時間,還滿有趣的,還有一個 flag 也很有趣:isNsfw。

唯一缺的就是 follower 的清單了。

這邊我嘗試用一樣的方法,在 Medium 網址後面接了參數:https://medium.com/@hulitw/followers?format=json,沒想到還真的有東西!在 response 裡面可以找到 10 個 follower 的資料。

有了資料之後就確定這個 API 是有用的,再來直接跳到 response 最下面 paging 的部分:

"paging":{  
   "path":"https://medium.com/_/api/users/f1fb3e40dc37/profile/stream",
   "next":{  
      "limit":10,
      "to":"10590c54e527",
      "source":"followers",
      "page":2
   }
}

path 的部分看起來是個 API 網址,next 應該是參數,試著把這些參數帶到網址上面:https://medium.com/_/api/users/f1fb3e40dc37/profile/stream?limit=10&to=10590c54e527&source=followers&page=2 ,就出現了只有 follower 相關的資料!

試著把 limit 換一下,發現最大值應該是 25,一次可以抓 25 筆資料;page 換一下之後發現沒什麼作用,於是把 to 也改一下,發現可以成功抓到新的資料。看來分頁機制是採用 cursor based 的。

在經過了幾次嘗試之後,終於拿到了兩個 API 的網址,一個可以獲得詳細個人資料,另外一個可以拿到 follower 的列表!

資料來源確定有了之後,就可以來構思一下爬蟲的架構了。

爬蟲架構

我要怎麼樣才能儘可能爬到所有的台灣寫作者?

首先第一個問題是我們必須把範圍放大一點,因為中文寫作者裡面可能有香港來的或是中國來的,你比較難靠程式去辨別到底是哪裡來的,尤其是香港跟台灣,因為都使用繁體中文。

為了不讓問題變得更複雜,我們只要能抓到「中文使用者就好」。

那要怎麼樣才能抓到最多中文使用者?一個很簡單的策略就是我們預設中文使用者的 follower 應該都是中文使用者,所以我們只要從某個使用者開始,把他所有的 follower 都丟進一個 queue 裡面,一直持續這個動作就好。

用文字簡化就是這樣:

  1. 從 queue 裡面拿出一個使用者
  2. 把他的資料寫進資料庫
  3. 把他的所有 follower 丟進 queue
  4. 回到步驟一

這樣子就可以靠著一個使用者無限延伸出去,而且理論上來說可以抓到超級多使用者的資料。這邊之所有選擇 follower(追蹤我的人)而不是 following(我追蹤的人),是考量到追蹤的使用者可能會有別的國家的,例如說我可能會追蹤國外的工程師之類的,但因為我不寫英文,所以國外的工程師應該不會來追蹤我。這樣的話就可以讓使用者侷限在中文,符合我們的目標。

接著就是系統架構的部分,這邊依據你想達成的效率會有不同種做法。

對我來說效率會最高的就是找那種很適合用來當 queue 的 service,例如說 redis 之類的,然後資料庫的部分可以選用 MySQL 或任何你熟悉的軟體。這樣子的好處是你可以開不同機器,然後每一台機器都是一個 worker,例如說你開五台機器,就會有五個 worker 一直從 queue 裡面拿東西出來並且把 follower 丟進去。

這邊之所以開很多台機器而不是開很多 thread 或 process,是因為 rate limiting 的問題。一般 API 都會有流量限制,你如果同一個 IP 發太多 request 會被 ban 掉或者是一段時間拿不到 response,所以開再多 process 跟 thread 都沒有用,只能開不同機器來解決(或只要有辦法換 IP 的話就可以)。

後來因為我沒有很在乎效率而且懶得開很多機器,所以只打算開一台讓他慢慢抓。如果只有一個 worker 的話,queue 的部分也可以簡單做一下,這邊我就也用 MySQL 來實做簡單的 queue,讓整個爬蟲的架構變得很簡單。

我們可以來看一下資料庫的架構:

Users

id userId username name bio follower fr mediumMemberAt createdAt
自增ID 使用者 ID 前面加上 @ 就是 profile 網址 使用者名稱 自介 追蹤人數 分類 成為付費會員的時間 加入會員的時間

Queue

id userId
自增ID 使用者 ID

程式的執行流程是這樣的:

  1. 從 Queue 裡面拿出一個 userId
  2. 如果 userId 已存在 Users,回到步驟一
  3. 把他的資料寫進 Users
  4. 把他的所有 follower 丟進 Queue
  5. 回到步驟一

從 queue 拿出來的時候先確保沒有爬過這個使用者,有的話就跳過,然後把所有追蹤者再丟到 queue 裡面,這樣程式就會一直跑,直到 queue 裡面沒有東西為止。

架構設計好之後,就可以來開始 coding 啦!

第一版爬蟲

首先我們需要有一個 queue,能夠 push 跟 pop,還要能確定現在拿的 userId 是不是已經爬過了。這個很適合用 class 來實作:

class Queue {
  constructor(conn) {
    this.conn = conn
  }

  get() {
    return new Promise((resolve, reject) => {
      this.conn.query('SELECT userId from Queues limit 1', (error, results) => {
        if (error) {
          console.log(error)
          return reject(error)
        }
        if (results.length !== 1) {
          return resolve(null)
        }
        const data = results[0]
        this.conn.query('DELETE from Queues where userId=?', [data.userId], (err, results) => {
          if (error) {
            console.log(error)
            return reject(error)
          }
          return resolve(data.userId)
        })
      });
    })
  }

  check(uid) {
    return new Promise((resolve, reject) => {
      this.conn.query('SELECT userId from Users where userId=?', [uid], function (error, results) {
        if (error) {
          return reject(error)
        }
        if (results.length > 0) {
          return resolve(false)
        }
        return resolve(true)
      });
    })
  }

  push(list) {
    return new Promise((resolve, reject) => {
      const values = []
      for (let item of list) {
        values.push([item])
      }
      this.conn.query(`
        INSERT IGNORE INTO Queues (userId) VALUES ?`, [values], (err) => {
          if (err) {
            // console.log(err)
          }
          resolve()
        }
      )
    })
  }
}

有了 queue 以後可以來寫主要邏輯,主程式的架構會長這樣:

var connection = mysql.createPool({
  connectionLimit : 10,
  host     : process.env.host,
  user     : '',
  password : '',
  database : 'medium',
  charset: 'utf8mb4'
})

async function main() {
  const queue = new Queue(connection)

  // 不斷從 queue 拿東西出來
  while(true) {
    const userId = await queue.get()
    if (!userId) {
      console.log('no data from queue, end')
      break;
    }

    // 看看是否已經爬過,爬過就跳掉
    const check = await queue.check(userId)
    if (!check) {
      continue
    }

    // 拿 userId 做你想做的事
    console.log('uid:', userId)
  }
}

接著只要實作以下兩個功能就好:

  1. 抓取使用者資料
  2. 把使用者資料寫進資料庫
  3. 把 follower 丟回 queue

由於 Medium API 的 response 都會有一個防 json hijacking 的開頭,因此我們可以包裝一個函式專門來 parse API 的 response:

async function getMediumResponse(url) {
  try {
    const response = await axios.get(url)
    const json = JSON.parse(response.data.replace('])}while(1);</x>', ''))
    return json
  } catch(err) {
    return null
  } 
}

接著就可以寫兩個 function,一個抓使用者資料,一個抓 follower 資料(有出現 _ 的都是 lodash 的 function):

async function getUserInfo(uid) {
  const url = `https://medium.com/_/api/users/${uid}/profile/stream`
  const json = await getMediumResponse(url)
  if (!json) {
    return {}
  }
  const userId = _.get(json, 'payload.user.userId')
  const follower = _.get(json, `payload.references.SocialStats.${userId}.usersFollowedByCount`, 0)

  return {
    followerCount: follower,
    userId: userId,
    name: _.get(json, 'payload.user.name'),
    username: _.get(json, 'payload.user.username'),
    bio: _.get(json, 'payload.user.bio'),
    mediumMemberAt: _.get(json, 'payload.user.mediumMemberAt'),
    isWriterProgramEnrolled: _.get(json, 'payload.user.isWriterProgramEnrolled'),
    createdAt: _.get(json, 'payload.user.createdAt'),
  }
}

async function getFollowers(uid, to) {
  let url = `https://medium.com/_/api/users/${uid}/profile/stream?source=followers&limit=200`
  if (to) {
    url += '&to=' + to
  }
  const json = await getMediumResponse(url)
  if (!json) {
    return {}
  }
  const followers = _.keys(json.payload.references.Social) || []
  const nextTo = _.get(json, 'payload.paging.next.to')
  return {
    followers,
    nextTo
  }
}

基本上都是 call API 之後稍微處理一下資料,然後把我們關注的東西傳回去。

上面我們只實做了「抓一次 follower」的 function,所以最後還要再實作一個「抓全部 follower 並且丟進 queue」的 function:

async function getAllFollowers(uid, queue) {
  const followers = []
  let to = undefined
  while (true) {
    const data = await getFollowers(uid, to)
    if (!data) {
      break;
    }
    followers.push(...data.followers)
    to = data.nextTo
    console.log(uid, 'fetching...', followers.length)
    if (data.followers.length === 0 || !to) {
      break;
    }
    await queue.push(data.followers)
  }
  return followers
}

這個函式會不斷去抓 follower 出來並丟進 queue,並且印出現在總共抓了幾筆 follower 的資料,全部抓完會把所有的 follower 回傳回去(會回傳是因為一開始我是全部抓完才一次寫進 queue,但後來發現比較沒效率,所以改成現在這樣抓一次就寫一次)。

最後是把使用者資料寫進去資料庫的程式碼:

function format(time) {
  if (!time) return null
  return moment(time).format('YYYY-MM-DD HH:mm:ss')
}

function saveUserInfo(conn, info) {
  conn.query(`
    INSERT INTO Users
    (
      userId, username, name, bio, follower,
      mediumMemberAt, createdAt, isWriterProgramEnrolled
    ) VALUES ?`, [[[
      info.userId, info.username, info.name, info.bio, info.followerCount,
      format(info.mediumMemberAt), format(info.createdAt), info.isWriterProgramEnrolled
    ]]], (err) => {
      if (err) {
        // console.log(err)
      }
    }
  )
}

把這幾個核心功能的 function 寫完以後,只要修正一下我們的主程式,就可以把整個爬蟲完成了:

async function main() {
  const queue = new Queue(connection)

  while(true) {

    // 1. 從 Queue 裡面拿出一個 userId
    const userId = await queue.get()
    if (!userId) {
      console.log('no data from queue, end')
      break;
    }

    // 2. 如果 userId 已存在 Users,回到步驟一
    const check = await queue.check(userId)
    if (!check) {
      continue
    }

    console.log('uid:', userId)
    try {
      const info = await getUserInfo(userId)

      // 如果沒抓到資料有可能是被擋了,先停個 3 秒
      if (!info.userId) {
        console.log('sleep...')
        await sleep(3000)
      }

      // 3. 把他的資料寫進 Users
      saveUserInfo(connection, info)

      // 4. 把他的所有 follower 丟進 Queue
      if (info.followerCount > 0) {
        // 把 followers 放到 queue 並印出總共幾筆資料
        const followerList = await getAllFollowers(userId, queue)
        console.log('Add ' + followerList.length + ' into queue.')
      }
    } catch(err) {
      // 有錯誤就先睡個 3 秒
      console.log('error...sleep')
      await sleep(3000)
    } 
  }  
}

上面就是我們按照先前的邏輯寫出來的程式碼:

  1. 從 Queue 裡面拿出一個 userId
  2. 如果 userId 已存在 Users,回到步驟一
  3. 把他的資料寫進 Users
  4. 把他的所有 follower 丟進 Queue
  5. 回到步驟一

不過這邊額外加了一個邏輯是當呼叫 API 有問題的時候,就先暫停 3 秒鐘,這樣是為了防止被 rate limiting 擋到。但這個機制做的不是很好,因為沒有 retry,所以一但發生錯誤,這個 userId 就被跳過了。

當初的想法是只跳過一個 userId 無傷大雅,畢竟 queue 裡面可能有十萬筆的 userId,而且就算跳過,之後還是有可能再被丟到 queue 裡面,所以不做 retry 的機制也無所謂。

上面的程式碼全部組裝起來,就是第一版爬蟲的雛形了。運作的 ok 沒什麼問題,就只是速度比較慢而已。而且 queue 增長的速度比想像中驚人,我跑了一個晚上 queue 大概就多了十萬筆資料,而 users 裡面卻只有四五千筆而已。

不過在跑了一個晚上之後,我發現了一個致命的錯誤。

第二版爬蟲:判斷中文

這個致命的錯誤就是當初的預設:「中文作者的 follower 都是中文作者」是有問題的,而且仔細想想會發現這個預設的確很不可靠。

所以跑了一個晚上的爬蟲,我發現資料庫裡面多了一大堆外國使用者。而且一但多了一個,你的 queue 裡面就會出現一大堆的外國使用者。

為了避免這個情形,我決定從自介跟暱稱下手,寫一個判斷自介跟暱稱是否含有中文的函式,如果有中文才被放進來。這邊我直接複製在 Stack Overflow 上找到的程式碼,看起來十分神奇:

function isChinese(text = '') {
  // @see: https://stackoverflow.com/questions/44669073/regular-expression-to-match-and-split-on-chinese-comma-in-javascript/51941287#51941287
  const regex = /(\p{Script=Hani})+/gu;
  return text.match(regex)
}

在 queue 裡面抓完使用者資料後會進行判斷:

const info = await getUserInfo(userId)

// 非中文,直接略過
if (!isChinese(info.bio) && !isChinese(info.name)) {
 continue;
}

做這個判斷的時候我就已經想到會有一個問題,那就是有些人他們喜歡國際化一點,在自介會放全英文,暱稱也會是英文,所以會被誤判。明明就是用中文寫作,但是卻沒有被加進 queue 裡面。

這邊我當時覺得無所謂,畢竟這樣的人不多,而且要解的話有點麻煩。當時我腦中本來就有浮現一個解法,就是去抓他最近拍手過或發表過的文章,看看標題是不是中文,這樣的判斷會準確很多。但當時我懶得實作,想說先讓爬蟲繼續跑一天看看。

隔天早上,又發現了一個完全沒想過會碰到的問題。

第三版爬蟲:判斷日文

使用者清單裡面出現一大堆日本人。

因為他們有些暱稱是漢字,要嘛就是自介有漢字,所以不會被中文判斷篩掉。發現這個問題的時候我第一個想法是:「如果這是在面試我一定被刷掉,這種 case 居然當初沒想到...」。

為了解決這種情況,就再找了一個判斷是不是有日文(不含漢字)的正則表達式:

function isJapanese(text = '') {
  // @see: https://gist.github.com/ryanmcgrath/982242
  const regexJP = /[\u3040-\u309F]|[\u30A0-\u30FF]/g; 
  const jp = text.match(regexJP)
  if (jp && jp.length >= 3) {
    return true
  }
  return false
}

如果含有三個以上的日文字母,就回傳是日文。這邊會設定數量是我怕有些台灣人用什麼 之類的,就會被誤判。不過除了寫死數量以外,還有個比較好的做法可能是看比例,例如說一句話如果有八九成是中文字,就是中文之類的。

判斷邏輯的部分改成這樣:

const info = await getUserInfo(userId)

// 非中文,直接略過
if (!isChinese(info.bio) && !isChinese(info.name)) {
 continue;
}

if (isJapanese(info.bio) || isJapanese(info.name)) {
    continue;
}

如果不是中文就跳過,再來確認是不是日文,如果自介或是暱稱是日文也跳過。

好,這樣就沒有問題了吧!於是我把資料砍光,再讓爬蟲跑一個晚上試試看。

隔天起來,發現我真是天真的可以。

第四版爬蟲:直接重構

打開資料庫,發現還是有很多日本使用者。原因在於他們可能暱稱是用漢字,然後沒有寫自介,或者自介只有一兩個字之類的,所以還是會被判定為是中文使用者。

追根究底,都是這個判斷機制太不可靠的原因。

既然事情已經到這個地步,就沒辦法偷懶了,我只能實作剛開始提到的更準確的解法:「看看最近發表過或是拍手過的文章是不是中文」,而這部分的資料幸好原本的 API 就有提供,實作起來比想像中簡單許多。

除了這個以外,由於 queue 增長的速度比消耗的速度快太多,因此我一度改變了一下方法。我寫了另外一支小程式,把原本流程中的「把 followers 丟到 queue」拿掉,並且一次拿 10 筆使用者資料出來。

換句話說,這個新的小程式做的事情很簡單,就是不斷抓使用者資料並存到資料庫,這樣 queue 就會一直變小,讓使用者資料愈來愈多。大概一個小時可以抓兩萬筆,累積一個晚上的 queue 白天花半天就可以跑完。

好處就是我可以快速累積使用者資料,畢竟原本的實作太慢了,一天大概只能跑個一萬筆左右,現在新的實作因為不用把東西丟到 queue 裡面,會讓使用者資料長得很快。

那時候偷懶直接複製程式碼改一下就做完這個新的小程式,導致程式寫到這邊愈來愈亂,考量到之後想要 open source,是時候整理一下程式碼了,於是就順便把程式重構一下。

重構完的架構如下:

.
├── README.md     // 說明
├── app.js        // 主程式
├── getUsers.js   // 只抓使用者資料的小程式 
├── config.js     // 設定檔
├── db.js         // 資料庫相關
├── medium.js     // medium API 相關
├── package.json  
├── queue.js     
└── utils.js

我們先從 config 開始看起吧:

module.exports = {
  db: {
    connectionLimit: 10,
    host     : '',
    user     : '',
    password : '',
    database : 'medium',
    charset: 'utf8mb4'
  },
  batchLimit: 1, // 一次抓多少筆使用者資料
  randomDelay: function() {
    return Math.floor(Math.random() * 200) + 100
  },
  errorRateTolerance: 0.2,
  delayWhenError: 500
}

這邊就是放一些設定檔,包括資料庫的設定以及一些抓資料的參數,大多數都是跟抓使用者資料的那個小程式有關,例如說要抓幾筆,然後每一次要停多久之類的。這些都是為了避免送太多 request 被擋而做的措施。

再來看一下 utils.js:

module.exports = {
  // @see: https://stackoverflow.com/questions/44669073/regular-expression-to-match-and-split-on-chinese-comma-in-javascript/51941287#51941287
  isChinese: (text = '') => {
    const regex = /(\p{Script=Hani})+/gu;
    return text.match(regex)
  },

  // @see: https://gist.github.com/ryanmcgrath/982242
  isJapanese: (text = '') => {
    const regexJP = /[\u3040-\u309F]|[\u30A0-\u30FF]/g; 
    const jp = text.match(regexJP)

    // more than 2 japanese char
    if (jp && jp.length >= 2) {
      return true
    }
    return false
  },

  sleep: ms => new Promise(resolve => {
    setTimeout(resolve, ms)
  }),

  log: function () {
    const args = Array.prototype.slice.call(arguments);
    console.log.apply(console, args)
  }
}

這邊基本上就是把剛剛用到的一些函式搬過來統一放在這邊,日文字母的限制縮小為兩個,然後把 console.log 包裝了一下,想說之後要客製化比較方便。

然後是 medium.js,這邊是有關 medium API 的部分,並且新增了一個函式 isMandarinUser 來判斷是否是中文使用者:

const axios = require('axios')
const _ = require('lodash')
const utils = require('./utils')
const JSON_HIJACKING_PREFIX = '])}while(1);</x>'

// wrapper function, return null instead of throwing error
async function getMediumResponse(url) {
  try {
    const response = await axios.get(url)
    const json = JSON.parse(response.data.replace(JSON_HIJACKING_PREFIX, ''))
    return json
  } catch(err) {
    return null
  }
}

function isMandarinUser(name, bio, posts) {

  // if bio or name is japanese, must be japanese
  if (utils.isJapanese(name) || utils.isJapanese(bio)) {
    return false
  }

   // this user has no activity on medium, decide by name and bio
  if (!posts) {
    return utils.isChinese(name) || utils.isChinese(bio)
  }

  const contents = _.values(posts).map(item => item.title + _.get(item, 'content.subtitle'))
  return Boolean(
    contents.find(item => {
      return utils.isChinese(item) && !utils.isJapanese(item)
    })
  )
}

module.exports = {
  getFollowers: async (uid, to) => {
    let url = `https://medium.com/_/api/users/${uid}/profile/stream?source=followers&limit=200`
    if (to) {
      url += '&to=' + to
    }
    const json = await getMediumResponse(url)
    if (!json) {
      return null
    }
    const followers = _.keys(json.payload.references.Social) || []
    const nextTo = _.get(json, 'payload.paging.next.to')
    return {
      followers,
      nextTo
    }
  },

  getUserInfo: async (uid) => {
    const url = `https://medium.com/_/api/users/${uid}/profile/stream`
    const json = await getMediumResponse(url)
    if (!json) {
      return {}
    }
    const userId = _.get(json, 'payload.user.userId')
    const follower = _.get(json, `payload.references.SocialStats.${userId}.usersFollowedByCount`, 0)

    const posts = _.get(json, 'payload.references.Post')
    const name = _.get(json, 'payload.user.name')
    const bio = _.get(json, 'payload.user.bio')

    return {
      isMandarinUser: isMandarinUser(name, bio, posts),
      userId,
      name,
      username: _.get(json, 'payload.user.username'),
      bio,
      followerCount: follower,
      mediumMemberAt: _.get(json, 'payload.user.mediumMemberAt'),
      isWriterProgramEnrolled: _.get(json, 'payload.user.isWriterProgramEnrolled'),
      createdAt: _.get(json, 'payload.user.createdAt'),
    }
  }
}

isMandarinUser 會根據三個參數來決定:暱稱、自介以及相關文章。相關文章可能是使用者最近發表過的或者是回覆過與拍手過的文章,會根據文章的標題以及副標題來做判定。

如果使用者沒有任何活動的話,就會跟之前一樣採用自介跟暱稱來判定,所以還是有誤判的可能,但實測過後誤判率已經滿低的了。

接著來看與資料庫相關的操作,db.js:

const mysql = require('mysql')
const moment = require('moment')

function format(time) {
  if (!time) return null
  return moment(time).format('YYYY-MM-DD HH:mm:ss')
}

function transform(info) {
  return [
    info.userId, info.username, info.name, info.bio, info.followerCount,
    format(info.mediumMemberAt), format(info.createdAt), info.isWriterProgramEnrolled, null
  ]
}

class DB {
  constructor(config) {
    this.conn = mysql.createPool(config)
  }

  getExistingUserIds() {
    return new Promise((resolve, reject) => {
      this.conn.query('SELECT userId from Users', (err, results) => {
        if (err) {
          return reject(err)
        }
       return resolve(results.map(item => item.userId))
      });
    })
  } 

  getUserIds(limit) {
    return new Promise((resolve, reject) => {
      this.conn.query('SELECT userId from Users where fr="TW" order by follower desc limit ' + limit, (err, results) => {
        if (err) {
          return reject(err)
        }
       return resolve(results.map(item => item.userId))
      });
    })
  } 

  deleteUserIds(userIds) {
    return new Promise((resolve, reject) => {
      this.conn.query('DELETE from Queues WHERE userId IN (?)', [userIds], (err, results) => {
        if (err) {
          return reject(err)
        }
        return resolve(userIds)
      })
    })
  }

  insertUserData(info) {
    if (!info) return
    const data = Array.isArray(info) ? info.map(transform) : [transform(info)]
    this.conn.query(`
      INSERT INTO Users
      (
        userId, username, name, bio, follower,
        mediumMemberAt, createdAt, isWriterProgramEnrolled, fr
      ) VALUES ?`, [data], (err) => {
        if (err) {
          // console.log(err)
        }
      }
    )
  }

  insertIntoQueue(list) {
    return new Promise((resolve, reject) => {
      const values = []
      for (let item of list) {
        values.push([item])
      }
      this.conn.query(`
        INSERT IGNORE INTO Queues (userId) VALUES ?`, [values], (err) => {
          if (err) {
            // console.log(err)
          }
          resolve()
        }
      )
    })
  }
}

module.exports = DB

基本上就是把一大堆 SQL query 包裝成 Promise 以及 function,方便其他的 module 來使用。大部分的函式都能夠接收一個 array 來做批次操作,這樣會更有效率一點。

而且把這些東西包裝起來之後,queue 的程式碼就會變得非常單純:

class Queue {
  constructor(db) {
    this.db = db
  }

  async get(limit) {
    const items = await this.db.getUserIds(limit)
    await this.db.deleteUserIds(items)
    return items
  }

  async push(list) {
    await this.db.insertIntoQueue(list)
  }
}

module.exports = Queue

最後來看一下我們的主程式 app.js,在重構之後程式碼變得乾淨很多,可讀性也提昇了不少:

const DB = require('./db')
const Queue = require('./queue')
const config = require('./config')
const medium = require('./medium')
const utils = require('./utils')

async function main() {
  const db = new DB(config.db)
  const queue = new Queue(db)
  const existingUserIds = await db.getExistingUserIds()
  const userIdMap = {}
  for (let userId of existingUserIds) {
    userIdMap[userId] = true
  }

  utils.log('Existing userId:', existingUserIds.length)

  while(true) {
    const userIds = await queue.get(1)
    if (userIds.length === 0) {
      utils.log('Done')
      break
    }

    const userId = userIds[0]
    if (userIdMap[userId]) {
      continue
    }
    userIdMap[userId] = true
    utils.log('userId:', userId)

    try {
      const userInfo = await medium.getUserInfo(userId)

      if (!userInfo.userId) {
        utils.log('getUerrInfo error, sleep for', config.delayWhenError)
        await utils.sleep(config.delayWhenError)
      }

      if (!userInfo.isMandarinUser) {
        utils.log(userId, 'not MandarinUser')
        continue
      }

      db.insertUserData(userInfo)

      if (userInfo.followerCount > 0) {
        let to = undefined
        let count = 0
        while (true) {
          const data = await medium.getFollowers(userInfo.userId, to)
          if (!data) {
            break
          }
          const { nextTo, followers } = data
          to = nextTo
          count += followers.length
          utils.log(userInfo.userId, 'fetching', count, 'followers')
          await queue.push(followers.filter(uid => !userIdMap[uid]))
          if (followers.length === 0 || !to) {
            break
          }
        }
      }
    } catch (err) {
      utils.log('sleep for', config.delayWhenError)
      utils.log(err)
      await utils.sleep(config.delayWhenError)
    }
  }
  process.exit()
}

main()

這邊有個機制與之前不一樣,之前是每次從 queue 拿一個 userId 出來就去資料庫確認一下是否爬過,但是這樣太沒有效率。在這個版本改成程式執行時就直接從資料庫裡面把所有資料拿出來,並且變成一個 map,如果有值的話就代表已經抓取過,反之亦然。

重構過的程式碼把 module 切開之後看起來順眼很多,而且要改什麼都很容易,沒有重構過的話我還真不敢 open source 出去...

這邊是重構完的程式碼:https://github.com/aszx87410/medium-user-crawler

總結

在寫爬蟲的過程中也是踩了滿多坑的,其中最麻煩的就是語言判斷那一塊,當初沒有想到日文漢字這個 case 要判斷,花了不少時間。偷懶也花了很多時間,原本偷懶不想用更精確的方法來做判定,沒想到最後還是得用,中間浪費了不少時間。

這爬蟲還有滿多地方可以改進的,例如說執行速度的部分,或者是判定語言的部分,目前是我把資料撈出來之後手動標是香港、台灣還是中國,但或許可以寫一些小程式來自動判定,例如說簡體就是中國,有出現一些粵語的字就是香港,反之則是台灣等等,雖然不一定準確,但至少用程式來輔助會方便很多。

這篇主要是分享一下我寫這個爬蟲的歷程,其實只要資料來源能確定抓得到,其他都不是什麼大問題。再加上這個爬蟲沒有很完整(例如說沒有 retry 機制),所以花個一兩天就能夠實作完成了。

希望這篇有吸引到大家,也很希望大家能試試看自己爬資料,做出有趣的數據分析!

關於作者:
@huli 野生工程師,相信分享與交流能讓世界變得更美好


#medium









Related Posts

關於 Markdown 格式

關於 Markdown 格式

YDKB study

YDKB study

[Day 5] JS in Pipeline (5): Testing, Linting, and Git Hooks

[Day 5] JS in Pipeline (5): Testing, Linting, and Git Hooks




Newsletter




Comments