Before using it, please read the data model and the introduction of the data format.

Overview

HdfsImporter is used to import JSON formatted historical or external data into the Sensors Analytics, and the data imported by HdfsImporter needs to be put into the HDFS on the Sensors Analytics cluster first.

Usage

Operating Environment

HdfsImporter can only be used in the Sensors Analytics Cluster Edition, and can only run in the Hadoop environment of the Sensors Analytics cluster.

Steps

The steps for a new import operation are as follows:

  1. Put the data on any machine in the Sensors Analytics cluster, for example, in the /home/sa_cluster directory, which includes files data01, data02, data03 (the names can be arbitrary) and so on (supporting txt, csv, log, and other types of files).
  2. The file content is a JSON that conforms to the data format on each line, and it is necessary to ensure that the sa_cluster account has read and write permissions on the directory file.
  3. Create an HDFS directory, adjust the permissions, and place the data on the Sensors Analytics cluster HDFS, for example, the /data/input directory. Other operations can refer to the hdfs operating command document.

    hdfs dfs -put /home/sa_cluster/data01 /data/input
    CODE
  4. Switch to the sa_cluster account:

    sudo su - sa_cluster
    CODE
  5. Please make sure that the data on HDFS is no longer being modified (adding, deleting, or modifying files), and then run HdfsImporter.
    1. Check the accuracy of the data without actual data import: Start the debug mode, and the data will only be output to the corresponding HDFS directory in the form of `JSON` text, andwill notbe actually imported into the Sensors Analytics system.

      hdfs_importer \ --path /data/input \ --project default \ --debug
      CODE
    2. After confirming the accuracy without errors, proceed to the actual data import:

      hdfs_importer \ --path /data/input \ --project default 
      CODE

Precautions

  • Deleting data after the import is complicated. Therefore, check the data before performing this operation. Importing the same data multiple times will result in data duplication.
  • path can be a data directory with subdirectories.
  • After successful execution of HdfsImporter, the data will be retrieved approximately 1 minute after.

Tool Running Parameters Description

  • List all running parameters:

    hdfs_importer --help
    CODE
    ParameterRequiredDescriptionDefault Value
    pathYesThe HDFS path where the data to be imported is located.
    projectNoThe name of the expected imported project. If not specified, it will be imported to the default project. Note: The project field in the imported data will be ignored, and this parameter will prevail.default
    all_data_without_track_signupNoWhether all data (including previous import types) does not include track_signup types. If all imported data does not include track_signup types, adding this option can improve import speed. Note that if user association has been used in the imported project, this option cannot be used.After enabling all_data_without_track_signup, it will be considered as data without track_signup type, which means is_login_id will be ignored and the existing association in the system will also be ignored.
    split_max_size_mbNoMapReduce maximum value for each Mapper to process the shard. This value affects the number of Mappers.512
    job_queue_nameNoThe queue name of the MapReduce job.
    split_reduce_size_mbNoThe amount of data processed by each Reducer in MapReduce. This value affects the data processed by the Reducers. The number of Reducers = total input data size / split_reduce_size_mb.2048
    mapper_max_memory_size_mbNoThe maximum memory value used by each Mapper in MapReduce.1024
    reduce_max_memory_size_mbNoThe maximum memory value used by each Reducer in MapReduce.2048
    reduce_output_file_size_mbNoMax file size output by each Reducer in MapReduce.256
    shuffle_partition_split_sizeNoThe size of each partition when shuffling, usually used to adjust data distribution.500000
    expired_record_filter_after_hourNoThe number of hours in the future that data will be filtered, default is 1, which means data beyond the next hour will be filtered.1
    expired_record_filter_before_hourNoThe number of hours in the past that data will be filtered, default is 17520, which means data beyond 2 years ago will be filtered.17520
    user_define_job_nameNoAllow customizing the job name, for example, in HdfsImpoter-12345-[your_name], your_name is the customized part.None
    debugNoStart debug mode. The data will be output to the corresponding HDFS directory in the format of [*] JSON text and will not be imported into the Sensors Analytics system.
    disable_import_succeeded_source_pathNoDisable re-importing of successfully imported pathsfalse
    write_import_infoNoGenerate import info file in the command execution pathfalse
    enable_first_time_processorNoEnable first-time adjustment on the first dayfalse
    profile_storage_modeNoProfile data import mode (kafka: profile data is sent to Kafka first and then consumed by Kafka consumer; kudu: profile data is directly written to Kudu)kafka
  • A complex example:

    hdfs_importer \ --path /data/input \ --project test_project \ --split_max_size_mb 128 \ --job_queue_name data_import_queue \ --split_reduce_size_mb 2048 \ --reduce_max_memory_size_mb 2048 \ --reduce_output_file_size_mb 256 \ --disable_import_succeeded_source_path
    CODE

Frequently Asked Questions

Query Import History

Since Sensors 1.13 version, it supports querying hdfs importer import history, and the query result is output in JSON format.

  • Usage :

    hdfs_importer list \ --project_name default
    CODE
  • List all configuration items :

    hdfs_importer list --help
    CODE
    Parameter RequiredDescription Default Value
    project_nameNoproject name, you can specify to query import task records according to the project name
    session_idNosession id, you can specify to query import task records according to the session id
    job_nameNojob name, you can specify to query import task records according to the job_name. Here, job_name refers to the user_define_job_name in the import task.
    statusNoThe status of the HDFS Importer import task, the status value can be WAITING, RUNNING, SUCCESS, FAILED
    start_timeNoThe start time. You can query import records with start time later than the specified time, the format is %Y-%m-%d %H:%M:%S
    max_numNoThe recent N times. You can specify the latest N import records10
    dumpNoOutput the query result to the specified FILENAME. If not specified, it will be output to the console
    fullNoIf specified, it will view all tasks. Otherwise, it will only view the latest task with the same session_idFalse
  • For example, query the hourly routine import task:

    hdfs_importer list \ --project_name test_project \ --job_name hourly_import \ --status SUCCESS \ --start_time '2018-08-30 00:00:00' \ --max_num 2 \ --dump /data/work/list_output_file1
    CODE

    The result output to the file is:

    [ { "id": 12, "session_id": 320, "job_name": "hourly_import", "scheduler_job_record_id": null, "import_path": "/sa/tmp/hdfsImport", "parameters": "{\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"}", "start_time": "2018-09-11 18:46:50", "end_time": "2018-09-11 18:49:46", "counter_info": "{\"event_read_count\":252600,\"event_import_count\":252600,\"event_skipped_count\":0,\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0}", "log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-43c7d4ea-0b14-48f6-8b03-764178e927ae.log", "event_job_id": "job_1536029076029_3074", "profile_job_id": "job_1536029076029_3077", "event_job_status": "SUCCESS", "profile_job_status": "SUCCESS", "event_data_load_status": "SUCCESS", "project_name": "test_project" }, { "id": 10, "session_id": 317, "job_name": "hourly_import", "scheduler_job_record_id": null, "import_path": "/sa/tmp/hdfsImport", "parameters": "{\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"}", "start_time": "2018-09-11 10:23:20", "end_time": "2018-09-11 10:26:21", "counter_info": "{\"event_read_count\":252600,\"event_import_count\":252600,\"event_skipped_count\":0,\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0}", "log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-67a00f94-67d8-415e-a004-c9ca82a17a2a.log", "event_job_id": "job_1536029076029_3044", "profile_job_id": null, "event_job_status": "SUCCESS", "profile_job_status": null, "event_data_load_status": "SUCCESS", "project_name": "test_project" } ]
    CODE
    ParameterDescriptionExample
    idPrimary key of the hdfs_importer table8
    session_idSession ID306
    job_nameName of the import taskhourly_import
    scheduler_job_record_idRecord ID of the scheduled tasknull
    import_pathHDFS path where the data is imported/sa/tmp/hdfsImport
    parametersImport task configuration parameters{\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"}
    start_timeImport task start time2018-09-10 19:02:08
    end_timeImport task completion time2018-09-10 19:02:46
    counter_infoNumber of data records imported{\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0}
    log_pathImport task running log/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-0634112a-6b90-40db-a26d-5492dbc7b995.log
    event_job_idImport event data task idjob_1536029076029_3082 or null (if no event data is imported)
    profile_job_idImport user attributes task idjob_1536029076029_3083 or null (no imported user attribute data)
    item_job_idImported item attribute task idjob_1536029076029_3085 or null (no imported item attribute data)
    event_job_statusImported event data task statusSUCCESS, FAILED or null (no imported event data)
    profile_job_statusImported user attribute task statusSUCCESS, FAILED or null (no imported user attribute data)
    item_job_statusImported item attribute task statusSUCCESS, FAILED or null (no imported item attribute data)
    event_data_load_statusEvent data loading statusSUCCESS, FAILED or null (no imported event data)
    project_nameProject nametest_project
  • For example, to query the import status of all tasks for a specified session_id:

    hdfs_importer list \ --session_id 306 \ --full
    CODE

    The result output to the console is

    [ { "id": 8, "session_id": 306, "job_name": "hourly_import", "scheduler_job_record_id": null, "import_path": "/sa/tmp/hdfsImport", "parameters": "{\"session\":\"HdfsImport-306\"}", "start_time": "2018-09-10 19:02:08", "end_time": "2018-09-10 19:02:46", "counter_info": "{\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0}", "log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-0634112a-6b90-40db-a26d-5492dbc7b995.log", "event_job_id": null, "profile_job_id": "job_1536029076029_3084", "event_job_status": null, "profile_job_status": "SUCCESS", "event_data_load_status": null, "project_name": "test_project" }, { "id": 7, "session_id": 306, "job_name": "hourly_import", "scheduler_job_record_id": null, "import_path": "/sa/tmp/hdfsImport", "parameters": "{\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"}", "start_time": "2018-09-10 18:58:45", "end_time": "2018-09-10 19:01:10", "counter_info": "{\"event_read_count\":252600,\"event_import_count\":252600,\"event_skipped_count\":0}", "log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-c7514335-5a55-42b8-bfd3-0ad7a27ec1a3.log", "event_job_id": "job_1536029076029_3082", "profile_job_id": "job_1536029076029_3083", "event_job_status": "SUCCESS", "profile_job_status": "FAILED", "event_data_load_status": "SUCCESS", "project_name": "test_project" } ]
    CODE

Get the data that failed to pass the verification

Please refer to the following command to navigate to path /sa/runtime/HDFSImporter or /sa/runtime/HDFSImporterOutdated find the invalidLog folder, where HdfsImporter-1 is the HdfsImporter session id, 4 is the project id that the data is imported to, which can be obtained through Multi-Project Management Tool. There is a series of files starting with invalid.logs in the invalidLog directory, which are the files reserved for error data, and then export this file from the Sensors Analytics server.

hdfs dfs -ls /sa/runtime/HDFSImporter/HdfsImporter-1/data/4/invalidLog
CODE


Delete the events imported by hdfs_importer

If your Sensors Analytics environment is installed with the SDF component, you can refer to SDFAdmin Data Cleaning Tool User Guide to configure the hdfs_import_names parameter to delete the events imported by hdfs_importer in batches. If SDF is not installed, refer to the sa_clean Data Cleaning Tool Documentation for event data deletion.

  • Delete the data of batch HdfsImporter-1 imported by hdfsImporter on November 1, 2020 in the httest project
sdfadmin event_delete -m execute -p httest -b 2020-11-01 -e 2020-11-01 --hdfs_import_names HdfsImporter-1
CODE


Find out which batch the event data in the events table is imported by

Use the attribute $hdfs_import_batch_name in the events table to see which batch the event is imported by hdfsImporter.