HdfsImporter
1. HdfsImporter 概述
HdfsImporter 與 BatchImporter 一樣用於將歷史數據或外部數據匯入神策分析。
2. 執行環境
HdfsImporter 僅可在神策分析叢集版使用,且只能執行在神策分析叢集的 Hadoop 環境中。
3. 經典使用方法
3.1. 一次新啟動的匯入步驟如下:
- 將數據放於神策分析叢集上的 HDFS 中。例如 HDFS 叢集 /data/input 目錄,其中包含檔案data01、data02、data03等檔案,檔案內容為每行一個符合 數據格式 的 Json。
切換到 sa_cluster 帳號:
sudo su - sa_cluster
CODE- 請確定 HDFS 上數據不再進行修改(增刪修改檔案)後,執行 HdfsImporter。
3.2. 使用方式
先檢查數據正確性,但不進行真的數據匯入:開始 debug 模式,數據只會以 `JSON` 文字的方式輸出到相應的 HDFS 目錄下,並且不會真正的匯入到神策系統當中。
hdfs_importer \ --path /data/input \ --project default \ --debug
BASH檢查無誤後,進行真正的數據匯入:
hdfs_importer \ --path /data/input \ --project default
BASH
3.3. 備註
- 1.13 版本以前,由於沒有簡略命令 hdfs_importer , 需使用 sh $SENSORS_ANALYTICS_HOME/tools/hdfs_importer/bin/hdfs_importer.sh 代替
- 匯入後清理數據較複雜,請檢查好再操作。對同一份數據多次執行匯入會導致數據重複。
- path 可以是一個包含子目錄的數據目錄。
- HdfsImporter 執行成功之後,數據大約會在1分鐘之後被查詢到。
- hdfs 操作命令文件參考
4. 設定項說明
列出所有設定項:
hdfs_importer --help
CODE參數 必填 說明 預設值 path 是 需要匯入的數據所在的 HDFS 路徑。 project 否 期望匯入的 project 名稱,不指定的則匯入到 default。注:匯入數據中的 project 欄位會失效,以此參數指定為準。 default all_data_without_track_signup 否 是否所有數據(包括之前各種型別匯入)都不包含 track_signup 類型數據。若所有匯入數據均不包含 track_signup 類型數據,增加這個選項可以提高匯入速度。注意,匯入專案中使用過用戶關聯功能則不能使用該選項。
split_max_size_mb 否 MapReduce 每個 Mapper 處理分片的最大值。該值會影響 Mapper 數量。 512 job_queue_name 否 MapReduce Job 的佇列名。 split_reduce_size_mb 否 MapReduce 每個 Reducer 處理的數據量大小。該值會影響 Reducer 的數據。 Reducer 個數 = 輸入的總數據量 / split_reduce_size_mb 。 2048 mapper_max_memory_size_mb 否 MapReduce 每個 Mapper 使用的記憶體最大值。 1024 reduce_max_memory_size_mb 否 MapReduce 每個 Reducer 使用的記憶體最大值。 2048 reduce_output_file_size_mb 否 MapReduce 每個 Reducer 輸出的檔案最大值。 256 shuffle_partition_split_size 否 Shuffer 時,每個 Mapper 切分 partition 的大小,一般用來調整數據分佈不均勻 500000 expired_record_filter_after_hour 否 允許匯入的數據時間區間截至未來的小時數,預設為 1,即超過未來 1 小時的數據將被過濾。 1 expired_record_filter_before_hour 否 允許匯入的數據時間區間向前的小時數,預設為 17520,即超過 2 年以前的數據將被過濾。 17520 user_define_job_name 否 允許自定義 job name,比如在 HdfsImpoter-12345-[your_name] 中,your_name 是自定義的部分 無 debug 否 開始 debug 模式,數據只會以 JSON
文本的方式輸出到相應的 HDFS 目錄下,並且不會真正的匯入到神策系統當中。disable_import_succeeded_source_path 否 禁止重複匯入已成功過的匯入路徑 false write_import_info 否 在命令執行路徑產生 import info 檔案 false enable_first_time_processor 否 開啟首日首次修正 false 一個複雜的例子:
hdfs_importer \ --path /data/input \ --project test_project \ --all_data_without_track_signup \ --split_max_size_mb 128 \ --job_queue_name data_import_queue \ --split_reduce_size_mb 2048 \ --reduce_max_memory_size_mb 2048 \ --reduce_output_file_size_mb 256 \ --disable_import_succeeded_source_path
CODE
5. 查詢匯入歷史記錄
自1.13 版本,支援查詢 hdfs importer 匯入歷史記錄, 查詢結果以 JSON 格式輸出。
使用方式 :
hdfs_importer list \ --project default
BASH列出所有設置 :
hdfs_importer list --help
BASH參數 必填 說明 預設值 project_name 否 project name, 可指定按照 project name 查詢匯入任務記錄 session_id 否 session id, 可指定按照 session id 查詢匯入任務記錄 job_name 否 job name,可指定按照 job_name 查詢匯入任務記錄。這裡的 job_name 為匯入任務中的 user_define_job_name status 否 hdfs importer 匯入任務的狀態,status 值可為 WAITING,RUNNING,SUCCESS,FAILED start_time 否 start time, 可查詢startTime比傳入的指定時間晚的匯入記錄, 格式%Y-%m-%d %H:%M:%S max_num 否 recent N times,可指定最近N次匯入記錄 10 dump 否 查詢結果輸出到指定的FILENAME;如果沒有指定,則會輸出到控制台 full 否 指定後則查看所有任務,否則同一個session_id只查看最新的任務 False 例如,查詢按小時的例行匯入任務:
hdfs_importer list \ --project_name test_project \ --job_name hourly_import \ --status SUCCESS \ --start_time '2018-08-30 00:00:00' \ --max_num 2 \ --dump /data/work/list_output_file1
BASH輸出到檔案中的結果為:
[ { "id": 12, "session_id": 320, "job_name": "hourly_import", "scheduler_job_record_id": null, "import_path": "/sa/tmp/hdfsImport", "parameters": "{\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"}", "start_time": "2018-09-11 18:46:50", "end_time": "2018-09-11 18:49:46", "counter_info": "{\"event_read_count\":252600,\"event_import_count\":252600,\"event_skipped_count\":0,\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0}", "log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-43c7d4ea-0b14-48f6-8b03-764178e927ae.log", "event_job_id": "job_1536029076029_3074", "profile_job_id": "job_1536029076029_3077", "event_job_status": "SUCCESS", "profile_job_status": "SUCCESS", "event_data_load_status": "SUCCESS", "project_name": "test_project" }, { "id": 10, "session_id": 317, "job_name": "hourly_import", "scheduler_job_record_id": null, "import_path": "/sa/tmp/hdfsImport", "parameters": "{\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"}", "start_time": "2018-09-11 10:23:20", "end_time": "2018-09-11 10:26:21", "counter_info": "{\"event_read_count\":252600,\"event_import_count\":252600,\"event_skipped_count\":0,\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0}", "log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-67a00f94-67d8-415e-a004-c9ca82a17a2a.log", "event_job_id": "job_1536029076029_3044", "profile_job_id": null, "event_job_status": "SUCCESS", "profile_job_status": null, "event_data_load_status": "SUCCESS", "project_name": "test_project" } ]
BASH再如,查詢指定 session_id 的所有匯入任務狀態:
hdfs_importer list \ --session_id 306 \ --full
BASH輸出到控制台的結果為
[ { "id": 8, "session_id": 306, "job_name": "hourly_import", "scheduler_job_record_id": null, "import_path": "/sa/tmp/hdfsImport", "parameters": "{\"session\":\"HdfsImport-306\"}", "start_time": "2018-09-10 19:02:08", "end_time": "2018-09-10 19:02:46", "counter_info": "{\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0}", "log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-0634112a-6b90-40db-a26d-5492dbc7b995.log", "event_job_id": null, "profile_job_id": "job_1536029076029_3084", "event_job_status": null, "profile_job_status": "SUCCESS", "event_data_load_status": null, "project_name": "test_project" }, { "id": 7, "session_id": 306, "job_name": "hourly_import", "scheduler_job_record_id": null, "import_path": "/sa/tmp/hdfsImport", "parameters": "{\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"}", "start_time": "2018-09-10 18:58:45", "end_time": "2018-09-10 19:01:10", "counter_info": "{\"event_read_count\":252600,\"event_import_count\":252600,\"event_skipped_count\":0}", "log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-c7514335-5a55-42b8-bfd3-0ad7a27ec1a3.log", "event_job_id": "job_1536029076029_3082", "profile_job_id": "job_1536029076029_3083", "event_job_status": "SUCCESS", "profile_job_status": "FAILED", "event_data_load_status": "SUCCESS", "project_name": "test_project" } ]
BASH
6. 更新日誌
6.1.1. 2018-09-13 (SA 版本:1.13)
- 匯入設定項中增加參數 disable_import_succeeded_source_path
- 匯入設定項中增加參數 write_import_info
- 可使用簡略的 hdfs_importer 命令替換 sh $SENSORS_ANALYTICS_HOME/tools/hdfs_importer/bin/hdfs_importer.sh
- 新增查詢匯入歷史子命令
6.1.2. 2017-11-29 (SA 版本:1.10.3329)
- 支援自定義 job_name 參數: user_define_job_name
6.1.3. 2017-07-10 (SA 版本:1.7.2628)
- 支援自定義 job_name 參數 mapper_max_memory_size_mb
- 支援用戶自定義 InputFormat,詳情請參考 github :HDFSImporter 自定義 InputFormat
6.1.4. 2017-07-04 (SA 版本:1.7.2614)
- 支援用戶自定義預處理模組,詳情請參考 github :HDFSImporter 預處理模組
- 新增 debug 模式