HdfsImporter
|
Collect
Before using it, please read the data model and the introduction of the data format.
Overview
HdfsImporter is used to import JSON formatted historical or external data into the Sensors Analytics, and the data imported by HdfsImporter needs to be put into the HDFS on the Sensors Analytics cluster first.
Usage
Operating Environment
HdfsImporter can only be used in the Sensors Analytics Cluster Edition, and can only run in the Hadoop environment of the Sensors Analytics cluster.
Steps
The steps for a new import operation are as follows:
- Put the data on any machine in the Sensors Analytics cluster, for example, in the /home/sa_cluster directory, which includes files data01, data02, data03 (the names can be arbitrary) and so on (supporting txt, csv, log, and other types of files).
- The file content is a JSON that conforms to the data format on each line, and it is necessary to ensure that the sa_cluster account has read and write permissions on the directory file.
Create an HDFS directory, adjust the permissions, and place the data on the Sensors Analytics cluster HDFS, for example, the /data/input directory. Other operations can refer to the hdfs operating command document.
hdfs dfs -put /home/sa_cluster/data01 /data/input
CODESwitch to the sa_cluster account:
sudo su - sa_cluster
CODE- Please make sure that the data on HDFS is no longer being modified (adding, deleting, or modifying files), and then run HdfsImporter.
Check the accuracy of the data without actual data import: Start the debug mode, and the data will only be output to the corresponding HDFS directory in the form of `JSON` text, andwill notbe actually imported into the Sensors Analytics system.
hdfs_importer \ --path /data/input \ --project default \ --debug
CODEAfter confirming the accuracy without errors, proceed to the actual data import:
hdfs_importer \ --path /data/input \ --project default
CODE
Precautions
- Deleting data after the import is complicated. Therefore, check the data before performing this operation. Importing the same data multiple times will result in data duplication.
- path can be a data directory with subdirectories.
- After successful execution of HdfsImporter, the data will be retrieved approximately 1 minute after.
Tool Running Parameters Description
List all running parameters:
hdfs_importer --help
CODEParameter Required Description Default Value path Yes The HDFS path where the data to be imported is located. project No The name of the expected imported project. If not specified, it will be imported to the default project. Note:
The project field in the imported data will be ignored, and this parameter will prevail.default all_data_without_track_signup No Whether all data (including previous import types) does not include track_signup types. If all imported data does not include track_signup types, adding this option can improve import speed. Note that if user association has been used in the imported project, this option cannot be used.After enabling all_data_without_track_signup, it will be considered as data without track_signup type, which means is_login_id will be ignored and the existing association in the system will also be ignored. split_max_size_mb No MapReduce maximum value for each Mapper to process the shard. This value affects the number of Mappers. 512 job_queue_name No The queue name of the MapReduce job. split_reduce_size_mb No The amount of data processed by each Reducer in MapReduce. This value affects the data processed by the Reducers. The number of Reducers = total input data size / split_reduce_size_mb. 2048 mapper_max_memory_size_mb No The maximum memory value used by each Mapper in MapReduce. 1024 reduce_max_memory_size_mb No The maximum memory value used by each Reducer in MapReduce. 2048 reduce_output_file_size_mb No Max file size output by each Reducer in MapReduce. 256 shuffle_partition_split_size No The size of each partition when shuffling, usually used to adjust data distribution. 500000 expired_record_filter_after_hour No The number of hours in the future that data will be filtered, default is 1, which means data beyond the next hour will be filtered. 1 expired_record_filter_before_hour No The number of hours in the past that data will be filtered, default is 17520, which means data beyond 2 years ago will be filtered. 17520 user_define_job_name No Allow customizing the job name, for example, in HdfsImpoter-12345-[your_name], your_name is the customized part. None debug No Start debug mode. The data will be output to the corresponding HDFS directory in the format of [*] JSON
text and will not be imported into the Sensors Analytics system.disable_import_succeeded_source_path No Disable re-importing of successfully imported paths false write_import_info No Generate import info file in the command execution path false enable_first_time_processor No Enable first-time adjustment on the first day false profile_storage_mode No Profile data import mode (kafka: profile data is sent to Kafka first and then consumed by Kafka consumer; kudu: profile data is directly written to Kudu) kafka A complex example:
hdfs_importer \ --path /data/input \ --project test_project \ --split_max_size_mb 128 \ --job_queue_name data_import_queue \ --split_reduce_size_mb 2048 \ --reduce_max_memory_size_mb 2048 \ --reduce_output_file_size_mb 256 \ --disable_import_succeeded_source_path
CODE
Frequently Asked Questions
Query Import History
Since Sensors 1.13 version, it supports querying hdfs importer import history, and the query result is output in JSON format.
Usage :
hdfs_importer list \ --project_name default
CODEList all configuration items :
hdfs_importer list --help
CODEParameter Required Description Default Value project_name No project name, you can specify to query import task records according to the project name session_id No session id, you can specify to query import task records according to the session id job_name No job name, you can specify to query import task records according to the job_name. Here, job_name refers to the user_define_job_name in the import task. status No The status of the HDFS Importer import task, the status value can be WAITING, RUNNING, SUCCESS, FAILED start_time No The start time. You can query import records with start time later than the specified time, the format is %Y-%m-%d %H:%M:%S max_num No The recent N times. You can specify the latest N import records 10 dump No Output the query result to the specified FILENAME. If not specified, it will be output to the console full No If specified, it will view all tasks. Otherwise, it will only view the latest task with the same session_id False For example, query the hourly routine import task:
hdfs_importer list \ --project_name test_project \ --job_name hourly_import \ --status SUCCESS \ --start_time '2018-08-30 00:00:00' \ --max_num 2 \ --dump /data/work/list_output_file1
CODEThe result output to the file is:
[ { "id": 12, "session_id": 320, "job_name": "hourly_import", "scheduler_job_record_id": null, "import_path": "/sa/tmp/hdfsImport", "parameters": "{\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"}", "start_time": "2018-09-11 18:46:50", "end_time": "2018-09-11 18:49:46", "counter_info": "{\"event_read_count\":252600,\"event_import_count\":252600,\"event_skipped_count\":0,\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0}", "log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-43c7d4ea-0b14-48f6-8b03-764178e927ae.log", "event_job_id": "job_1536029076029_3074", "profile_job_id": "job_1536029076029_3077", "event_job_status": "SUCCESS", "profile_job_status": "SUCCESS", "event_data_load_status": "SUCCESS", "project_name": "test_project" }, { "id": 10, "session_id": 317, "job_name": "hourly_import", "scheduler_job_record_id": null, "import_path": "/sa/tmp/hdfsImport", "parameters": "{\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"}", "start_time": "2018-09-11 10:23:20", "end_time": "2018-09-11 10:26:21", "counter_info": "{\"event_read_count\":252600,\"event_import_count\":252600,\"event_skipped_count\":0,\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0}", "log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-67a00f94-67d8-415e-a004-c9ca82a17a2a.log", "event_job_id": "job_1536029076029_3044", "profile_job_id": null, "event_job_status": "SUCCESS", "profile_job_status": null, "event_data_load_status": "SUCCESS", "project_name": "test_project" } ]
CODEParameter Description Example id Primary key of the hdfs_importer table 8 session_id Session ID 306 job_name Name of the import task hourly_import scheduler_job_record_id Record ID of the scheduled task null import_path HDFS path where the data is imported /sa/tmp/hdfsImport parameters Import task configuration parameters {\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"} start_time Import task start time 2018-09-10 19:02:08 end_time Import task completion time 2018-09-10 19:02:46 counter_info Number of data records imported {\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0} log_path Import task running log /data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-0634112a-6b90-40db-a26d-5492dbc7b995.log event_job_id Import event data task id job_1536029076029_3082 or null (if no event data is imported) profile_job_id Import user attributes task id job_1536029076029_3083 or null (no imported user attribute data) item_job_id Imported item attribute task id job_1536029076029_3085 or null (no imported item attribute data) event_job_status Imported event data task status SUCCESS, FAILED or null (no imported event data) profile_job_status Imported user attribute task status SUCCESS, FAILED or null (no imported user attribute data) item_job_status Imported item attribute task status SUCCESS, FAILED or null (no imported item attribute data) event_data_load_status Event data loading status SUCCESS, FAILED or null (no imported event data) project_name Project name test_project For example, to query the import status of all tasks for a specified session_id:
hdfs_importer list \ --session_id 306 \ --full
CODEThe result output to the console is
[ { "id": 8, "session_id": 306, "job_name": "hourly_import", "scheduler_job_record_id": null, "import_path": "/sa/tmp/hdfsImport", "parameters": "{\"session\":\"HdfsImport-306\"}", "start_time": "2018-09-10 19:02:08", "end_time": "2018-09-10 19:02:46", "counter_info": "{\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0}", "log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-0634112a-6b90-40db-a26d-5492dbc7b995.log", "event_job_id": null, "profile_job_id": "job_1536029076029_3084", "event_job_status": null, "profile_job_status": "SUCCESS", "event_data_load_status": null, "project_name": "test_project" }, { "id": 7, "session_id": 306, "job_name": "hourly_import", "scheduler_job_record_id": null, "import_path": "/sa/tmp/hdfsImport", "parameters": "{\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"}", "start_time": "2018-09-10 18:58:45", "end_time": "2018-09-10 19:01:10", "counter_info": "{\"event_read_count\":252600,\"event_import_count\":252600,\"event_skipped_count\":0}", "log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-c7514335-5a55-42b8-bfd3-0ad7a27ec1a3.log", "event_job_id": "job_1536029076029_3082", "profile_job_id": "job_1536029076029_3083", "event_job_status": "SUCCESS", "profile_job_status": "FAILED", "event_data_load_status": "SUCCESS", "project_name": "test_project" } ]
CODE
Get the data that failed to pass the verification
Please refer to the following command to navigate to path /sa/runtime/HDFSImporter or /sa/runtime/HDFSImporterOutdated find the invalidLog folder, where HdfsImporter-1 is the HdfsImporter session id, 4 is the project id that the data is imported to, which can be obtained through Multi-Project Management Tool. There is a series of files starting with invalid.logs in the invalidLog directory, which are the files reserved for error data, and then export this file from the Sensors Analytics server.
hdfs dfs -ls /sa/runtime/HDFSImporter/HdfsImporter-1/data/4/invalidLog
Delete the events imported by hdfs_importer
If your Sensors Analytics environment is installed with the SDF component, you can refer to SDFAdmin Data Cleaning Tool User Guide to configure the hdfs_import_names parameter to delete the events imported by hdfs_importer in batches. If SDF is not installed, refer to the sa_clean Data Cleaning Tool Documentation for event data deletion.
- Delete the data of batch HdfsImporter-1 imported by hdfsImporter on November 1, 2020 in the httest project
sdfadmin event_delete -m execute -p httest -b 2020-11-01 -e 2020-11-01 --hdfs_import_names HdfsImporter-1
Find out which batch the event data in the events table is imported by
Use the attribute $hdfs_import_batch_name in the events table to see which batch the event is imported by hdfsImporter.
Note: The content of this document is a technical document that provides details on how to use the Sensors product and does not include sales terms; the specific content of enterprise procurement products and technical services shall be subject to the commercial procurement contract.