在使用前,請先閱讀 數據模型數據格式 的介紹。

1. BatchImporter 概述

批量匯入工具用於將歷史數據或外部數據匯入神策分析進行使用。即時數據匯入請使用 LogAgent

使用批量匯入工具匯入的數據需要符合 數據格式,本文最後附錄也有簡單格式介紹。

2. 執行環境

批量匯入工具只能在部署神策分析的單機或叢集機器上使用。

3. 經典使用方法

本節介紹 BatchImporter 最常見的使用方法,其他功能請參考“4.使用方法詳解”。

步驟如下:

a 將數據放於某個路徑,比如需要匯入的數據在/home/work/data 下,有part-001、part-002、part-003(命名可以隨意)幾個檔案(每行一個符合 數據格式 的 Json )。

b 切換到 sa_cluster 帳號。

sudo su - sa_cluster
CODE

c 找到 batch_importer 。

# 可直接使用 batch_importer
batch_importer --help
# 如果提示上面的指令不存在, 則進入下面的目錄使用 bin/sa-importer
cd /home/sa_cluster/sp/tools/batch_importer
bin/sa-importer --help
CODE


d 執行批量匯入工具,檢查數據正確性,但不進行真的數據匯入
注意:path 指向的是數據所在資料夾而不是檔案,該資料夾下的所有檔案都會被批量匯入工具讀取。

bin/sa-importer --path /home/work/data
CODE


執行後會顯示統計資訊,

Import session read 32 valid records, 0 lines can't be parsed, 0 records invalid.
CODE


這裡說明有32條有效數據,0條數據不可解析,0條可解析但數據無效。若有無效數據,將會在 Log 裡展現。

e 經過步驟4檢查數據都沒問題以後,進行真正的數據匯入。

bin/sa-importer --path /home/work/data --import --session new
CODE


當出現如下資訊時說明匯入結束。

Send 32 records to kafka
Import /home/work/data completed.
CODE


注意:

  • sa_cluster 需要有數據目錄和檔案連接權限,可以切換到 sa_cluster 後 tail 一下數據檔案看是否能打開。
  • 匯入後清理數據較複雜,請檢查好再操作。對同一份數據多次執行匯入會導致數據重複。
  • 批量匯入工具的 Log 在神策分析的目錄下,一般是在 /data/sa_cluster/logs/batch_importer 。由於數據有問題被過濾的數據將額外儲存在上述 Log 目錄的 invalid_records 中。
  • 批量匯入工具透過 --path 參數指定要匯入的目錄,並匯入目錄下所有的檔案。請在啟動匯入後不要增刪、修改目錄下的檔案,否則無法保證匯入結果符合預期。
  • 指定數據匯入的專案有兩種方法:
    • 在數據中增加 `project` 欄位(詳見 數據格式 ),使用這種方式可以一次匯入多個專案的數據;
    • 啟動匯入時,增加 `--project` 參數。所有數據無論是否指定 `project` 欄位都將匯入到參數設定的專案中。

常見問題:

  1. distinct_id 是一個字串 "123123" ,所給的數據裡卻是數值 123123 (沒有雙引號)。。
  2. 某個 property 欄位的型別與之前數據的型別不符,比如之前是字串,後來是數值。

4. 使用方法詳解

4.1. 呼叫和參數

在批量匯入工具的部署路徑或其他路徑下執行 sa-importer 。

sa-importer [參數]
CODE

參數簡介:

參數說明
--help列印使用说明
--type_defineproperties 中各欄位型別限定
--import是否執行匯入,若不配置該參數,執行只做校驗
--speed_limit匯入速度限制,單位是 條/秒,預設不限速
--sample輸出一些範例數據。
--path需要匯入數據的路徑
--session本次執行匯入的 Session 文件。如果是一次新的匯入任務,請設定為 new
--parser預設 Json ,一般請勿使用該參數
--manifest批量匯入工具執行结束後,使用參數值作為路徑輸出 manifest 檔案。若啟動時該路徑已經有檔案那麼啟動失敗。
--project將數據匯入的專案。請注意,若指定該數據,數據中的 project 欄位將無效,本次執行的所有數據都將匯入到 --project 指定的專案。
--expired_record_filter_after_hour允許匯入的數據時間區間截至未來的小時數,預設為 1,即超過未來 1 小时的數據將被過濾。
--expired_record_filter_before_hour允許匯入的數據時間區間向前的小時數,預設為 17520,即超過 2 年以前的數據將被過濾。


批量匯入工具兩種執行模式:校驗數據模式 和 匯入數據模式

4.2.  校驗數據模式

由於數據匯入是一個複雜的過程,所以我們希望匯入數據前用戶可以對數據先進行簡單的校驗工作,主要是校驗數據是否符合最基本的規範(見概述中數據格式的描述)。

批量匯入工具校驗功能的使用流程如下:

  1. 將需要檢驗的數據放入部署神策分析系統的某台機器中,該目錄中不要包含其他不需要檢驗的無關文件,例如路徑是 `/home/work/data` 。
  2. 執行批量匯入工具:
bin/sa-importer --path /home/work/data
CODE


執行結束會顯示統計:

Import session read 33128 valid records, 3 lines can't be parsed, 2 records invalid.
CODE

表示有 33128 條數據符合格式,3 條數據無法解析,2 條數據有問題。

若希望 BatchImporter 遇到錯誤數據立刻停止執行,可以增加參數 `--debug_exit_with_bad_record`。例如:

bin/sa-importer --path /home/work/data --debug_exit_with_bad_record
CODE

4.3. 匯入數據模式

匯入數據的過程分為啟動一個新的匯入任務和恢復舊的匯入任務。

  • 匯入數據模式的標誌是 --import 參數,如果不加該參數則執行模式為4.2中介紹的校驗數據。
  • 匯入數據模式必須使用 --session <SessionFile> 參數顯式的指定 SessionFile ,如果是一次新匯入任務,請設定 SessionFile 的值為 new。

4.3.1. 新建匯入任務

1. 將需要檢驗的數據放入部署神策分析系統的某台機器中,該目錄中不要包含其他不需要檢驗的無關文件,例如路徑是 /home/work/data 。
2. 執行批量匯入工具:使用 --import --session new ,必須指定路徑 --path 。

bin/sa-importer --path /home/work/data --import --session new
CODE

剛開始執行或 Ctrl+C 中斷都會顯示本次的 SessionID :

啟動時:Importer Session File: '2015-06-19-18-19-50.session'

Ctrl+C退出時:Import session file: '2015-06-19-18-19-50.session', you can run importer with arg '-session 2015-06-19-18-19-50.session' to continue import.

4.3.2. 恢復匯入任務

如果某次數據匯入任務被中止,使用 SessionFile 可以進行該任務的恢復:

1. 例如某次匯入 /home/work/data 的匯入任務被中止。
2. 執行批量匯入工具:使用 --import,必須指定 --session <SessionID>,不能指定 --path

bin/sa-importer --import --session 2015-06-19-18-19-50.session
CODE


注意:如果要恢復匯入任務,那麼之前使用的目錄下不能有任何檔案變動(修改內容,增加檔案、刪除檔案),否則將啟動失敗。如需追加匯入內容,請將數據放到其他目錄,並使用“新建匯入任務”。

4.4. manifest 檔案内容解讀

若使用了 manifest 參數指定了 manifest 文件,匯入執行結束後會在參數值目錄產生 manifest 文件,該文件包含匯入的基本統計資訊,可用於自動化腳本,作為 done 文件(匯入結束的標誌)。

  • 若啟動匯入時 manifest 參數值所指路徑檔案已經存在,那麼匯入啟動會失敗。
  • 產生 manifest 不是必須的,該文件內容可用於除錯和判斷匯入過程是否結束,並簡單校驗匯入正確性。
bin/sa-importer --path /home/work/data --session new --import --manifest info_2015_12_22
CODE

產生的 info_2015_12_22 檔案範例:

{
  "session_id" : "1",  // 匯入的SessionID
  "do_import" : true,  // 是否匯入數據,false 為只校驗數據
  "complete" : true,   // 是否執行成功,false 可能是人為或異常中斷
  "read_files" : [ "/home/work/data/a", "/home/work/data/b" ],  // 實際讀取的檔案列表
  "plan_files" : [ "/home/work/data/a", "/home/work/data/b" ],  // 目錄下應該讀取的檔案列表
  "valid_count" : 209,  // 有效數據條數
  "total_count" : 209,  // 總讀取條數
  "progress" : {
    "synced_source_progress" : {  // 進度資訊
      "f" : "(dev=801,ino=1055397)",
      "o" : 32232,
      "n" : "b",
      "s" : 208,
      "c" : 208,
      "e" : "1"
    },
    "sended_source_progress" : {  // 進度資訊
      "f" : "(dev=801,ino=1055397)",
      "o" : 32232,
      "n" : "b",
      "s" : 208,
      "c" : 208,
      "e" : "1"
    },
    "kafka_progress" : {  // kafka進度資訊
      "0" : {
        "offset" : 22435,
        "partition_id" : 0,
        "update_timestamp" : 1450771040053
      },
      "1" : {
        "offset" : 22838,
        "partition_id" : 1,
        "update_timestamp" : 1450771045419
      },
      "2" : {
        "offset" : 23185,
        "partition_id" : 2,
        "update_timestamp" : 1450771040071
      }
    },
    "last_update_time" : 1450771042587,  // 上次更新統計時間
    "last_sync_time" : 1450771045419,    // 上次寫kafka時間
    "status" : {
      "start_times" : 1,
      "this_time_start_running_time" : 1450771040213,  // 啟動匯入時間
      "sending_speed" : 0.0,
      "sending_records_in_store" : 0,
      "counter_filtered_by_expired_time" : 0,
      "counter_invalid_log_entry" : 0,
      "counter_invalid_reader_log_entry" : 0,
      "sent_to_kafka" : 209,
      "raw_read_count" : 209,
      "message_counter" : {
        "counter_map" : { }
      }
    }
  }
}
CODE


5. 注意事項

  1. 執行批量匯入工具匯入的數據不易清除,請謹慎操作。
  2. 批量匯入工具讀取檔案的順序是按照指定資料夾中檔案名的字典序。
  3. 如果SensorsAnalytics有正在執行的即時數據流,請設定限速以免影響即時數據,設定的方法是加參數 `--speed_limit <limit>` ,例如 `--speed_limit 300` 。

附錄 I. 數據格式

另外有專門頁面介紹數據格式,請參考 數據格式

需要匯入的文件每行為一條如下格式的 JSON :

{"type":"track","properties":{"propertie1":"value1","propertie2":"value2"},"event":"EventName","distinct_id":"DistinctId","original_id":"OriginalId","time":1431353191267}
CODE


屬性簡介:

屬性名要求含義
type必須欄位,值只能是下表中給出的這條記錄的型別
properties必須欄位,JSON 的 k-v 對Event 或 Profile 關聯的屬性
eventtype 為 track 類時為必須欄位,profile 類時不需設定。字串 Event 的名字
distinct_id必須欄位。字串用戶的固定且唯一標識
original_idtype 為 track_signup 時為必須欄位,其他 type 該欄位沒有用。字串註冊之前的隨機匿名ID
time必須欄位。unix 時間戳,精確到毫秒!這條記錄對應的時間

type:這條數據的型別,可以是如下型別中的一種

Type解釋
track一個 Event 及關聯的 Properties
track_signup跟蹤用戶的註冊行為,該介面與 track 基本相同,除了多了一個 original_id 參數。
profile_set直接設定一個用戶的 Profile,如果用戶或者 Profile 已存在則覆蓋,不存在則自動建立。
profile_append追加一個用戶的某個 List 型別的 Profile,如果用戶或者 Profile 不存在則自動建立。
profile_increment增加或減少一個用戶的某個 Numeric 型別的 Profile,如果用戶不存在則自動建立, Profile 不存在則預設為 0。
profile_delete刪除一個用戶。
profile_unset刪除一個用戶的某個特定的 Profile。


  • properties:Event 或 Profile 關聯的屬性,Key-Value 結構。 Key 必須為字串, Value 的型別可以是字串、整數、浮點數、布林、字串陣列。 Key 中字元只能為大小寫字母、數字和下底線。 SensorsAnalytics 中的屬性數據型別定義,及長度限制,詳見:數據格式
    • 注意:每個 property 的型別需要確保從始至終都是同一個。如一開始為 NUMBER ,之後不能變為 STRING
  • event:Event 的名字。如果 type 是 track 類,說明這是一條 event 類型數據,需要包含該欄位,否則這條數據將無效被過濾。
  • original_id:在用戶註冊之前所使用的隨機匿名 ID。
  • distinct_id: 用戶的固定且唯一標識,例如用戶表的主鍵等,一般應當是由產品的註冊行為回傳。
  • time:這條數據對應的時間,單位為毫秒。 2015/6/19 17:36:11 對應 1434706571000。