雖然批處理解決方案可以解決大量數據用例,但在某些情況下,必須使用流式處理解決方案。借助Google Cloud,您可以通過無伺服器服務實現這一目標,只需為實際使用的資源付費。在本文中,我們將瞭解Google Cloud架構的工作原理:它檢索 HTTPS 請求作為輸入,在這些請求到達后立即處理這些請求,並將這些請求的輸出寫入數據倉庫。我們將重點介紹體系結構中保證消息數與輸出相同數量的部分。
為了讓事情更清楚,以下是我們的架構所做的:
有了更多細節和 GCP 工具,它看起來像這樣:
在我們的架構中,傳入的請求由雲函數收集,該函數將讀取請求,檢查格式是否有效,然後通過發送到 Pub/Sub 主題的消息傳輸資訊。我們之所以選擇使用 Cloud Function,是因為要執行的代碼非常簡單,並且該服務能夠根據傳入流量的演變來增加其實例數量,而不必擔心。
因此,我們在 Pub/Sub 主題和 Cloud Run 之間創建了一個“推送”訂閱。我們選擇了“推送”配置來保留架構的流式處理特性。然後,該消息由 Cloud Run 處理,如有必要,這可能需要比 Cloud Function 更長的時間。然後,Cloud Run 會將其處理輸出寫入 BigQuery 表。
通過將獲取與處理分開,我們確保儘快處理使用者作為輸入發送的請求。然後,處理可能需要更長的時間才能完成其工作,因為延遲不太重要。事實上,如果工作負載變得過於繁重,Pub/Sub 訂閱將保留消息並將其發送回去,直到 Cloud Run 處理完它們。
通過這種方式,我們可以保證使用者不會遇到任何特定的延遲,即使在流量高峰期也是如此。我們還可以確定所有「有效」消息都將由我們的架構處理,因為 Pub/Sub 保證接收者將發送和接收至少一次消息(在我們的例子中是 Cloud Run)。
經過幾天的測試,我們意識到我們的架構確實在處理所有傳入的消息,但有些消息存在於 BigQuery 表中的多個副本中。這意味著我們的 Cloud Run 正在多次處理相同的消息。為什麼?經過多次調查,我們發現大多數重複項都是在流量高峰期到達 BigQuery 表中的。在請求激增期間,Cloud Run 實例的數量會迅速增加。因此,Pub/Sub 訂閱可以向不同的實例發送相同的消息,因為此服務保證至少傳遞一次消息。如果您希望將 Pub/Sub 限制為發送單個消息,這是可能的,但只能通過「拉取」訂閱,這意味著您現在面對的是 Batch 架構。
為了解決資料庫中重複消息的問題,我們有兩種選擇:要麼在處理過程中過濾消息,刪除那些已經處理過的消息,要麼使用具有一定頻率的 SQL 查詢清理資料庫。我們決定實施第一個解決方案。之所以選擇這種選擇,是因為我們希望盡可能地保持即時方法。但是,隨著 SQL 查詢的執行,我們將不得不批量工作。此外,對大量數據進行常規SQL查詢可能會產生大量成本。
為了解決這個問題,我們不得不使用新的 GCP 服務,您可以在我們的架構圖上看到。
我們在Google Cloud上設置了一個名為Memorystore的 Redis 實例。此服務用作緩存:一旦消息被成功處理,我們就會將消息ID作為密鑰寫入Memorystore。然後,一旦新消息到達,我們就會查詢 Memorystore 實例,以查看消息 ID 是否已存在於資料庫中。如果是,我們不會處理該消息,因為這意味著它已被 Cloud Run 處理。如果消息 ID 在 Memorystore 中不存在,我們將處理消息並將 ID 寫入實例,從而表示消息剛剛被處理。
在 Memorystore 中寫入金鑰時,還可以為其分配到期日期。在我們的示例中,我們將此值設置為15分鐘,因為不需要將消息ID保留超過該時間。
我們還使用了 VPC 來確保 Cloud Run 和 Memorystore 實例之間的安全連接。為了加強安全性,我們啟動了 Memorystore 實例的身份驗證要求,這意味著我們需要一個安全密鑰才能與之通信。為了安全地存儲此密鑰,我們將其放置在 Google Cloud 的 Secret Manager 中,我們直接從 Cloud Run 代碼中調用它。
為什麼我們使用 Redis 而不是另一個資料庫?首先,我們想要一個“key:value”資料庫,它允許我們非常快速地檢索密鑰。然後,我們知道我們只需要在某個時間間隔內使用這些密鑰,因此我們需要一個資料庫,允許我們輸入密鑰的到期日期。這就是我們選擇Google Cloud的Memorystore服務的原因。警告:在我們的案例中,使用 Memorystore 很有用,因為我們有大量數據,但重要的是要指定它的使用必須適應用例,因為在其最小配置中,該服務每月花費 70 美元。
經過數周的測試,我們的架構平均每秒處理 1,500 個請求,有些峰值為 3,000 個。我們能夠觀察到,我們使用 Memorystore 的重複檢查系統根本沒有增加 Cloud Run 處理的請求的延遲。我們還發現,我們的系統平均每天檢測到 5,000 到 15,000 條重複消息,在每天大約 7000 萬條消息中,有些峰值為每天 300,000 條。更重要的是,所有消息現在只傳遞到資料庫一次。
關於我們的架構,有幾點可以修改或改進。
如果您的第一個收集步驟需要您調整大量參數(請求數、每個實例的 CPU 和記憶體數等),並且您希望將 Docker 用作部署工具,那麼將我們的 Cloud Function 替換為 Cloud Run 可能更適合您的用例。
如果您希望收集點檢索來自地理位置分散的外部使用者的請求,請考慮在使用者和收集點之間設置負載均衡器(Cloud Run 或 Cloud Function)。此外,藉助負載均衡器,您可以輕鬆集成 Cloud Armor(Google Cloud 的 WAF),以及管理您的子功能變數名稱。
最後,如果您的數據處理量很輕,甚至不存在,並且您不想使用 Docker 來簡化部署,則可以將我們的 Cloud Run 替換為 Cloud Function。
在我們的月度時事通訊《Tea O'Clock》中發現所有最新新聞、文章、網路研討會重播和 55 項活動。