在使用前,請先閱讀數據模型數據格式的介紹。

1. HdfsImporter 概述

HdfsImporter 與 BatchImporter 一樣用於將歷史數據或外部數據匯入神策分析。

2. 執行環境

HdfsImporter 僅可在神策分析叢集版使用,且只能執行在神策分析叢集的 Hadoop 環境中。

3. 經典使用方法

3.1. 一次新啟動的匯入步驟如下:

  1. 將數據放於神策分析叢集上的 HDFS 中。例如 HDFS 叢集 /data/input 目錄,其中包含檔案data01、data02、data03等檔案,檔案內容為每行一個符合 數據格式 的 Json。
  2. 切換到 sa_cluster 帳號: 

    sudo su - sa_cluster
    CODE
  3. 請確定 HDFS 上數據不再進行修改(增刪修改檔案)後,執行 HdfsImporter。

3.2. 使用方式

  1.  先檢查數據正確性,但不進行真的數據匯入:開始 debug 模式,數據只會以 `JSON` 文字的方式輸出到相應的 HDFS 目錄下,並且不會真正的匯入到神策系統當中。

    hdfs_importer \
    --path /data/input \
    --project default \
    --debug
    BASH
  2. 檢查無誤後,進行真正的數據匯入:

    hdfs_importer \
    --path /data/input \
    --project default
    BASH

3.3. 備註

  • 1.13 版本以前,由於沒有簡略命令 hdfs_importer , 需使用 sh $SENSORS_ANALYTICS_HOME/tools/hdfs_importer/bin/hdfs_importer.sh 代替
  • 匯入後清理數據較複雜,請檢查好再操作。對同一份數據多次執行匯入會導致數據重複。
  • path 可以是一個包含子目錄的數據目錄。
  • HdfsImporter 執行成功之後,數據大約會在1分鐘之後被查詢到。
  • hdfs 操作命令文件參考


4. 設定項說明

  • 列出所有設定項:

    hdfs_importer --help
    CODE
    參數必填說明預設值
    path需要匯入的數據所在的 HDFS 路徑。
    project期望匯入的 project 名稱,不指定的則匯入到 default。注:匯入數據中的 project 欄位會失效,以此參數指定為準。default
    all_data_without_track_signup

    是否所有數據(包括之前各種型別匯入)都不包含 track_signup 類型數據。若所有匯入數據均不包含 track_signup 類型數據,增加這個選項可以提高匯入速度。注意,匯入專案中使用過用戶關聯功能則不能使用該選項。


    split_max_size_mbMapReduce 每個 Mapper 處理分片的最大值。該值會影響 Mapper 數量。512
    job_queue_nameMapReduce Job 的佇列名。
    split_reduce_size_mbMapReduce 每個 Reducer 處理的數據量大小。該值會影響 Reducer 的數據。 Reducer 個數 = 輸入的總數據量 / split_reduce_size_mb 。2048
    mapper_max_memory_size_mbMapReduce 每個 Mapper 使用的記憶體最大值。1024
    reduce_max_memory_size_mbMapReduce 每個 Reducer 使用的記憶體最大值。2048
    reduce_output_file_size_mbMapReduce 每個 Reducer 輸出的檔案最大值。256
    shuffle_partition_split_sizeShuffer 時,每個 Mapper 切分 partition 的大小,一般用來調整數據分佈不均勻500000
    expired_record_filter_after_hour允許匯入的數據時間區間截至未來的小時數,預設為 1,即超過未來 1 小時的數據將被過濾。1
    expired_record_filter_before_hour允許匯入的數據時間區間向前的小時數,預設為 17520,即超過 2 年以前的數據將被過濾。17520
    user_define_job_name允許自定義 job name,比如在 HdfsImpoter-12345-[your_name] 中,your_name 是自定義的部分
    debug開始 debug 模式,數據只會以 JSON 文本的方式輸出到相應的 HDFS 目錄下,並且不會真正的匯入到神策系統當中。
    disable_import_succeeded_source_path禁止重複匯入已成功過的匯入路徑false
    write_import_info在命令執行路徑產生 import info 檔案false
    enable_first_time_processor開啟首日首次修正false
  • 一個複雜的例子:

    hdfs_importer \
    --path /data/input \
    --project test_project \
    --all_data_without_track_signup \
    --split_max_size_mb 128 \
    --job_queue_name data_import_queue \
    --split_reduce_size_mb 2048 \
    --reduce_max_memory_size_mb 2048 \
    --reduce_output_file_size_mb 256 \
    --disable_import_succeeded_source_path
    CODE

5. 查詢匯入歷史記錄

自1.13 版本,支援查詢 hdfs importer 匯入歷史記錄, 查詢結果以 JSON 格式輸出。

  • 使用方式 :

    hdfs_importer list \
    --project default
    BASH
  • 列出所有設置 :

    hdfs_importer list --help
    BASH
    參數必填說明 預設值
    project_nameproject name, 可指定按照 project name 查詢匯入任務記錄
    session_idsession id, 可指定按照 session id 查詢匯入任務記錄
    job_namejob name,可指定按照 job_name 查詢匯入任務記錄。這裡的 job_name 為匯入任務中的 user_define_job_name
    statushdfs importer 匯入任務的狀態,status 值可為 WAITING,RUNNING,SUCCESS,FAILED
    start_timestart time, 可查詢startTime比傳入的指定時間晚的匯入記錄, 格式%Y-%m-%d %H:%M:%S
    max_numrecent N times,可指定最近N次匯入記錄10
    dump查詢結果輸出到指定的FILENAME;如果沒有指定,則會輸出到控制台
    full指定後則查看所有任務,否則同一個session_id只查看最新的任務False
  • 例如,查詢按小時的例行匯入任務:

    hdfs_importer list \
    --project_name test_project \
    --job_name hourly_import \
    --status SUCCESS \
    --start_time '2018-08-30 00:00:00' \
    --max_num 2 \
    --dump /data/work/list_output_file1
    BASH

    輸出到檔案中的結果為:

    [
        {
            "id": 12,
            "session_id": 320,
            "job_name": "hourly_import",
            "scheduler_job_record_id": null,
            "import_path": "/sa/tmp/hdfsImport",
            "parameters": "{\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"}",
            "start_time": "2018-09-11 18:46:50",
            "end_time": "2018-09-11 18:49:46",
            "counter_info": "{\"event_read_count\":252600,\"event_import_count\":252600,\"event_skipped_count\":0,\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0}",
            "log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-43c7d4ea-0b14-48f6-8b03-764178e927ae.log",
            "event_job_id": "job_1536029076029_3074",
            "profile_job_id": "job_1536029076029_3077",
            "event_job_status": "SUCCESS",
            "profile_job_status": "SUCCESS",
            "event_data_load_status": "SUCCESS",
            "project_name": "test_project"
        },
        {
            "id": 10,
            "session_id": 317,
            "job_name": "hourly_import",
            "scheduler_job_record_id": null,
            "import_path": "/sa/tmp/hdfsImport",
            "parameters": "{\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"}",
            "start_time": "2018-09-11 10:23:20",
            "end_time": "2018-09-11 10:26:21",
            "counter_info": "{\"event_read_count\":252600,\"event_import_count\":252600,\"event_skipped_count\":0,\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0}",
            "log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-67a00f94-67d8-415e-a004-c9ca82a17a2a.log",
            "event_job_id": "job_1536029076029_3044",
            "profile_job_id": null,
            "event_job_status": "SUCCESS",
            "profile_job_status": null,
            "event_data_load_status": "SUCCESS",
            "project_name": "test_project"
        }
    ]
    BASH
  • 再如,查詢指定 session_id 的所有匯入任務狀態:

    hdfs_importer list \
    --session_id 306 \
    --full
    BASH

    輸出到控制台的結果為

    [
        {
            "id": 8,
            "session_id": 306,
            "job_name": "hourly_import",
            "scheduler_job_record_id": null,
            "import_path": "/sa/tmp/hdfsImport",
            "parameters": "{\"session\":\"HdfsImport-306\"}",
            "start_time": "2018-09-10 19:02:08",
            "end_time": "2018-09-10 19:02:46",
            "counter_info": "{\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0}",
            "log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-0634112a-6b90-40db-a26d-5492dbc7b995.log",
            "event_job_id": null,
            "profile_job_id": "job_1536029076029_3084",
            "event_job_status": null,
            "profile_job_status": "SUCCESS",
            "event_data_load_status": null,
            "project_name": "test_project"
        },
        {
            "id": 7,
            "session_id": 306,
            "job_name": "hourly_import",
            "scheduler_job_record_id": null,
            "import_path": "/sa/tmp/hdfsImport",
            "parameters": "{\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"}",
            "start_time": "2018-09-10 18:58:45",
            "end_time": "2018-09-10 19:01:10",
            "counter_info": "{\"event_read_count\":252600,\"event_import_count\":252600,\"event_skipped_count\":0}",
            "log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-c7514335-5a55-42b8-bfd3-0ad7a27ec1a3.log",
            "event_job_id": "job_1536029076029_3082",
            "profile_job_id": "job_1536029076029_3083",
            "event_job_status": "SUCCESS",
            "profile_job_status": "FAILED",
            "event_data_load_status": "SUCCESS",
            "project_name": "test_project"
        }
    ]
    BASH


6. 更新日誌

6.1.1. 2018-09-13 (SA 版本:1.13)

  1. 匯入設定項中增加參數 disable_import_succeeded_source_path
  2. 匯入設定項中增加參數 write_import_info
  3. 可使用簡略的 hdfs_importer 命令替換 sh $SENSORS_ANALYTICS_HOME/tools/hdfs_importer/bin/hdfs_importer.sh
  4. 新增查詢匯入歷史子命令

6.1.2. 2017-11-29 (SA 版本:1.10.3329)

  1. 支援自定義 job_name 參數: user_define_job_name

6.1.3. 2017-07-10 (SA 版本:1.7.2628)

  1. 支援自定義 job_name 參數 mapper_max_memory_size_mb
  2. 支援用戶自定義 InputFormat,詳情請參考 github :HDFSImporter 自定義 InputFormat

6.1.4. 2017-07-04 (SA 版本:1.7.2614)

  1. 支援用戶自定義預處理模組,詳情請參考 github :HDFSImporter 預處理模組
  2. 新增 debug 模式