BatchImporter
1. BatchImporter 概述
批量匯入工具用於將歷史數據或外部數據匯入神策分析進行使用。即時數據匯入請使用 LogAgent。
使用批量匯入工具匯入的數據需要符合 數據格式,本文最後附錄也有簡單格式介紹。
2. 執行環境
批量匯入工具只能在部署神策分析的單機或叢集機器上使用。
3. 經典使用方法
本節介紹 BatchImporter 最常見的使用方法,其他功能請參考“4.使用方法詳解”。
步驟如下:
a 將數據放於某個路徑,比如需要匯入的數據在/home/work/data 下,有part-001、part-002、part-003(命名可以隨意)幾個檔案(每行一個符合 數據格式 的 Json )。
b 切換到 sa_cluster 帳號。
sudo su - sa_cluster
c 找到 batch_importer 。
# 可直接使用 batch_importer
batch_importer --help
# 如果提示上面的指令不存在, 則進入下面的目錄使用 bin/sa-importer
cd /home/sa_cluster/sp/tools/batch_importer
bin/sa-importer --help
d 執行批量匯入工具,檢查數據正確性,但不進行真的數據匯入。
注意:path 指向的是數據所在資料夾而不是檔案,該資料夾下的所有檔案都會被批量匯入工具讀取。
bin/sa-importer --path /home/work/data
執行後會顯示統計資訊,
Import session read 32 valid records, 0 lines can't be parsed, 0 records invalid.
這裡說明有32條有效數據,0條數據不可解析,0條可解析但數據無效。若有無效數據,將會在 Log 裡展現。
e 經過步驟4檢查數據都沒問題以後,進行真正的數據匯入。
bin/sa-importer --path /home/work/data --import --session new
當出現如下資訊時說明匯入結束。
Send 32 records to kafka
Import /home/work/data completed.
注意:
- sa_cluster 需要有數據目錄和檔案連接權限,可以切換到 sa_cluster 後 tail 一下數據檔案看是否能打開。
- 匯入後清理數據較複雜,請檢查好再操作。對同一份數據多次執行匯入會導致數據重複。
- 批量匯入工具的 Log 在神策分析的目錄下,一般是在 /data/sa_cluster/logs/batch_importer 。由於數據有問題被過濾的數據將額外儲存在上述 Log 目錄的 invalid_records 中。
- 批量匯入工具透過 --path 參數指定要匯入的目錄,並匯入目錄下所有的檔案。請在啟動匯入後不要增刪、修改目錄下的檔案,否則無法保證匯入結果符合預期。
- 指定數據匯入的專案有兩種方法:
- 在數據中增加 `project` 欄位(詳見 數據格式 ),使用這種方式可以一次匯入多個專案的數據;
- 啟動匯入時,增加 `--project` 參數。所有數據無論是否指定 `project` 欄位都將匯入到參數設定的專案中。
常見問題:
- distinct_id 是一個字串 "123123" ,所給的數據裡卻是數值 123123 (沒有雙引號)。。
- 某個 property 欄位的型別與之前數據的型別不符,比如之前是字串,後來是數值。
4. 使用方法詳解
4.1. 呼叫和參數
在批量匯入工具的部署路徑或其他路徑下執行 sa-importer 。
sa-importer [參數]
參數簡介:
參數 | 說明 |
---|---|
--help | 列印使用说明 |
--type_define | properties 中各欄位型別限定 |
--import | 是否執行匯入,若不配置該參數,執行只做校驗 |
--speed_limit | 匯入速度限制,單位是 條/秒,預設不限速 |
--sample | 輸出一些範例數據。 |
--path | 需要匯入數據的路徑 |
--session | 本次執行匯入的 Session 文件。如果是一次新的匯入任務,請設定為 new |
--parser | 預設 Json ,一般請勿使用該參數 |
--manifest | 批量匯入工具執行结束後,使用參數值作為路徑輸出 manifest 檔案。若啟動時該路徑已經有檔案那麼啟動失敗。 |
--project | 將數據匯入的專案。請注意,若指定該數據,數據中的 project 欄位將無效,本次執行的所有數據都將匯入到 --project 指定的專案。 |
--expired_record_filter_after_hour | 允許匯入的數據時間區間截至未來的小時數,預設為 1,即超過未來 1 小时的數據將被過濾。 |
--expired_record_filter_before_hour | 允許匯入的數據時間區間向前的小時數,預設為 17520,即超過 2 年以前的數據將被過濾。 |
批量匯入工具兩種執行模式:校驗數據模式 和 匯入數據模式。
4.2. 校驗數據模式
由於數據匯入是一個複雜的過程,所以我們希望匯入數據前用戶可以對數據先進行簡單的校驗工作,主要是校驗數據是否符合最基本的規範(見概述中數據格式的描述)。
批量匯入工具校驗功能的使用流程如下:
- 將需要檢驗的數據放入部署神策分析系統的某台機器中,該目錄中不要包含其他不需要檢驗的無關文件,例如路徑是 `/home/work/data` 。
- 執行批量匯入工具:
bin/sa-importer --path /home/work/data
執行結束會顯示統計:
Import session read 33128 valid records, 3 lines can't be parsed, 2 records invalid.
表示有 33128 條數據符合格式,3 條數據無法解析,2 條數據有問題。
若希望 BatchImporter 遇到錯誤數據立刻停止執行,可以增加參數 `--debug_exit_with_bad_record`。例如:
bin/sa-importer --path /home/work/data --debug_exit_with_bad_record
4.3. 匯入數據模式
匯入數據的過程分為啟動一個新的匯入任務和恢復舊的匯入任務。
- 匯入數據模式的標誌是 --import 參數,如果不加該參數則執行模式為4.2中介紹的校驗數據。
- 匯入數據模式必須使用 --session <SessionFile> 參數顯式的指定 SessionFile ,如果是一次新匯入任務,請設定 SessionFile 的值為 new。
4.3.1. 新建匯入任務
1. 將需要檢驗的數據放入部署神策分析系統的某台機器中,該目錄中不要包含其他不需要檢驗的無關文件,例如路徑是 /home/work/data 。
2. 執行批量匯入工具:使用 --import --session new ,必須指定路徑 --path 。
bin/sa-importer --path /home/work/data --import --session new
剛開始執行或 Ctrl+C 中斷都會顯示本次的 SessionID :
啟動時:Importer Session File: '2015-06-19-18-19-50.session'
Ctrl+C退出時:Import session file: '2015-06-19-18-19-50.session', you can run importer with arg '-session 2015-06-19-18-19-50.session' to continue import.
4.3.2. 恢復匯入任務
如果某次數據匯入任務被中止,使用 SessionFile 可以進行該任務的恢復:
1. 例如某次匯入 /home/work/data 的匯入任務被中止。
2. 執行批量匯入工具:使用 --import,必須指定 --session <SessionID>,不能指定 --path。
bin/sa-importer --import --session 2015-06-19-18-19-50.session
注意:如果要恢復匯入任務,那麼之前使用的目錄下不能有任何檔案變動(修改內容,增加檔案、刪除檔案),否則將啟動失敗。如需追加匯入內容,請將數據放到其他目錄,並使用“新建匯入任務”。
4.4. manifest 檔案内容解讀
若使用了 manifest 參數指定了 manifest 文件,匯入執行結束後會在參數值目錄產生 manifest 文件,該文件包含匯入的基本統計資訊,可用於自動化腳本,作為 done 文件(匯入結束的標誌)。
- 若啟動匯入時 manifest 參數值所指路徑檔案已經存在,那麼匯入啟動會失敗。
- 產生 manifest 不是必須的,該文件內容可用於除錯和判斷匯入過程是否結束,並簡單校驗匯入正確性。
bin/sa-importer --path /home/work/data --session new --import --manifest info_2015_12_22
產生的 info_2015_12_22 檔案範例:
{
"session_id" : "1", // 匯入的SessionID
"do_import" : true, // 是否匯入數據,false 為只校驗數據
"complete" : true, // 是否執行成功,false 可能是人為或異常中斷
"read_files" : [ "/home/work/data/a", "/home/work/data/b" ], // 實際讀取的檔案列表
"plan_files" : [ "/home/work/data/a", "/home/work/data/b" ], // 目錄下應該讀取的檔案列表
"valid_count" : 209, // 有效數據條數
"total_count" : 209, // 總讀取條數
"progress" : {
"synced_source_progress" : { // 進度資訊
"f" : "(dev=801,ino=1055397)",
"o" : 32232,
"n" : "b",
"s" : 208,
"c" : 208,
"e" : "1"
},
"sended_source_progress" : { // 進度資訊
"f" : "(dev=801,ino=1055397)",
"o" : 32232,
"n" : "b",
"s" : 208,
"c" : 208,
"e" : "1"
},
"kafka_progress" : { // kafka進度資訊
"0" : {
"offset" : 22435,
"partition_id" : 0,
"update_timestamp" : 1450771040053
},
"1" : {
"offset" : 22838,
"partition_id" : 1,
"update_timestamp" : 1450771045419
},
"2" : {
"offset" : 23185,
"partition_id" : 2,
"update_timestamp" : 1450771040071
}
},
"last_update_time" : 1450771042587, // 上次更新統計時間
"last_sync_time" : 1450771045419, // 上次寫kafka時間
"status" : {
"start_times" : 1,
"this_time_start_running_time" : 1450771040213, // 啟動匯入時間
"sending_speed" : 0.0,
"sending_records_in_store" : 0,
"counter_filtered_by_expired_time" : 0,
"counter_invalid_log_entry" : 0,
"counter_invalid_reader_log_entry" : 0,
"sent_to_kafka" : 209,
"raw_read_count" : 209,
"message_counter" : {
"counter_map" : { }
}
}
}
}
5. 注意事項
- 執行批量匯入工具匯入的數據不易清除,請謹慎操作。
- 批量匯入工具讀取檔案的順序是按照指定資料夾中檔案名的字典序。
如果SensorsAnalytics有正在執行的即時數據流,請設定限速以免影響即時數據,設定的方法是加參數 `--speed_limit <limit>` ,例如 `--speed_limit 300` 。
附錄 I. 數據格式
另外有專門頁面介紹數據格式,請參考 數據格式
需要匯入的文件每行為一條如下格式的 JSON :
{"type":"track","properties":{"propertie1":"value1","propertie2":"value2"},"event":"EventName","distinct_id":"DistinctId","original_id":"OriginalId","time":1431353191267}
屬性簡介:
屬性名 | 要求 | 含義 |
---|---|---|
type | 必須欄位,值只能是下表中給出的 | 這條記錄的型別 |
properties | 必須欄位,JSON 的 k-v 對 | Event 或 Profile 關聯的屬性 |
event | type 為 track 類時為必須欄位,profile 類時不需設定。字串 | Event 的名字 |
distinct_id | 必須欄位。字串 | 用戶的固定且唯一標識 |
original_id | type 為 track_signup 時為必須欄位,其他 type 該欄位沒有用。字串 | 註冊之前的隨機匿名ID |
time | 必須欄位。unix 時間戳,精確到毫秒! | 這條記錄對應的時間 |
type:這條數據的型別,可以是如下型別中的一種
Type | 解釋 |
---|---|
track | 一個 Event 及關聯的 Properties |
track_signup | 跟蹤用戶的註冊行為,該介面與 track 基本相同,除了多了一個 original_id 參數。 |
profile_set | 直接設定一個用戶的 Profile,如果用戶或者 Profile 已存在則覆蓋,不存在則自動建立。 |
profile_append | 追加一個用戶的某個 List 型別的 Profile,如果用戶或者 Profile 不存在則自動建立。 |
profile_increment | 增加或減少一個用戶的某個 Numeric 型別的 Profile,如果用戶不存在則自動建立, Profile 不存在則預設為 0。 |
profile_delete | 刪除一個用戶。 |
profile_unset | 刪除一個用戶的某個特定的 Profile。 |
- properties:Event 或 Profile 關聯的屬性,Key-Value 結構。 Key 必須為字串, Value 的型別可以是字串、整數、浮點數、布林、字串陣列。 Key 中字元只能為大小寫字母、數字和下底線。 SensorsAnalytics 中的屬性數據型別定義,及長度限制,詳見:數據格式 。
- 注意:每個 property 的型別需要確保從始至終都是同一個。如一開始為 NUMBER ,之後不能變為 STRING
- event:Event 的名字。如果 type 是 track 類,說明這是一條 event 類型數據,需要包含該欄位,否則這條數據將無效被過濾。
- original_id:在用戶註冊之前所使用的隨機匿名 ID。
- distinct_id: 用戶的固定且唯一標識,例如用戶表的主鍵等,一般應當是由產品的註冊行為回傳。
- time:這條數據對應的時間,單位為毫秒。 2015/6/19 17:36:11 對應 1434706571000。