BatchImporter
|
Collect
If the SDF is not installed, you can enter the /home/sa_cluster/sp/tools/batch_importer path and use bin/sa-importer,
If SDF is available, you can use bin/sa-importer in the $SENSORS_DATAFLOW_HOME/tools/batch_importer/ path.
If you're not sure whether SDF is installed in your environment, please consult your data consultant for one-on-one assistance.
Before using it, please read the data modeland the data formatintroduction.
1. Overview
The batch import tool is used to import historical or external data into SensData for use. For real-time data import, please use LogAgent.
The data imported using the batch import tool needs to conform to the data format. The last section of this article also provides a brief format introduction.
2. Usage
2.1. Operating Environment
The batch import tool can only be used on a single machine or cluster machine where SensData is deployed.
2.2. Usage Steps
This section introduces the most common usage methods of BatchImporter. For other functions, please refer to "3. Detailed Explanation of Tool Running Parameters".
The steps are as follows:
- Place the data in a certain path, for example, if the data to be imported is under /home/work/data, there are several files named part-001, part-002, part-003 (the naming can be arbitrary) (supports txt, csv, log and other types of files, and each line is a Json that conforms to the data format).
Switch to the sa_cluster account.
sudo su - sa_cluster
CODEFind batch_importer.
# 可直接使用 batch_importer batch_importer --help # 如果提示上面的命令不存在, 则进入下面的目录使用 bin/sa-importer,如果有 sdf 那就是/home/sa_cluster/sdf/tools 这个路径下 cd /home/sa_cluster/sp/tools/batch_importer bin/sa-importer --help
CODERun the batch import tool, check data correctness, but do not import actual data.
Note: path points to the folder where the data is located, not the file, and all files in that folder will be read by the batch import tool.bin/sa-importer --path /home/work/data 或 batch_importer --path /home/work/data
CODEAfter running, statistical information will be displayed.
Import session read 32 valid records, 0 lines can't be parsed, 0 records invalid.
CODEHere it is explained that there are 32 valid data, 0 data that cannot be parsed, and 0 data that can be parsed but is invalid. If there is any invalid data, it will be reflected in the log.
After checking the data in step 4 and there are no issues, proceed with the actual data import.
bin/sa-importer --path /home/work/data --import --session new --project default 或 batch_importer --path /home/work/data --import --session new --project default
CODEWhen the following information appears, it indicates the end of the import.
Send 32 records to kafka Import /home/work/data completed.
CODE
2.3. Notes
- There are two methods to specify the project for data import:
- Add a 'project' field to the data (seedata format). In this way, the data of multiple projects can be imported at once;
- When starting the import, add the `--project` parameter. All data, whether the `project` field is specified or not, will be imported into the project specified by the parameter.
- sa_cluster needs access permissions to the data directory and files. You can switch to sa_cluster and tail the data files to see if they can be opened.
- After importing, cleaning up the data is complex, so please check before performing operations. Running the import multiple times with the same data will cause data duplication.
- The batch import tool specifies the directory to be imported using the `--path` parameter, and imports all files in the directory. Please do not add, delete, or modify files in the directory after starting the import, otherwise the import results may not meet expectations.
- The batch import tool reads the files in the specified folder in lexicographic order of the file names.
- If SensorsAnalytics has a running real-time data stream, please set a speed limit to avoid affecting real-time data. The method to set the speed limit is to add the parameter `--speed_limit <limit>`, for example, `--speed_limit 300`.
- It does not support correcting first-day-first-time, it is recommended not to use this tool to import data containing first-day-first-time.
3. Detailed explanation of tool running parameters
3.1. Invocation parameters
Execute sa-importer in the deployment path or other path of the batch import tool.
sa-importer [参数]
Parameter Introduction:
Parameter | Description |
---|---|
--help | Print usage instructions |
--type_define | Data type definition in properties |
--import | Whether to execute import. If this parameter is not configured, the operation will only perform validation. |
--speed_limit | Import speed limit in records/second. The default is unlimited. |
--sample | Output some example data. |
--path | The path of the data to be imported. |
--session | The session file for the current import. If it is a new import task, set it to "new". |
--parser | Default is Json, please avoid using this parameter in general. |
--manifest | After the batch import tool finishes running, use the parameter value as the path to output the manifest file. If the path already has a file when started, the startup will fail. |
--project | Importing project data. Please note that if this parameter is specified, the project field in the data will be invalid and all data will be imported into the project specified by --project. |
--expired_record_filter_after_hour | Specifies the number of hours in the future to filter data during import. Default value is 1, which means data more than 1 hour in the future will be filtered out. |
--expired_record_filter_before_hour | Specifies the number of hours in the past to filter data during import. Default value is 17520, which means data more than 2 years in the past will be filtered out. |
The batch import tool has two running modes: Data Validation Mode and Data Import Mode.
3.2. Data Validation Mode
Since data import is a complex process, we recommend users to validate the data before importing, mainly to check if the data complies with the basic format requirements (as described in the overview).
The process of using the batch import tool for data validation is as follows:
- Place the data to be validated in the directory of a machine where Sensors Analytics is deployed. The directory should not contain any other irrelevant files that do not require validation, for example, the path is `/home/work/data`.
- Run the batch import tool:
bin/sa-importer --path /home/work/data
At the end of the run, the statistics will be displayed:
Import session read 33128 valid records, 3 lines can't be parsed, 2 records invalid.
33128 records are valid, 3 records cannot be parsed, and 2 records have issues.
If you want BatchImporter to stop immediately upon encountering error data, you can add the `--debug_exit_with_bad_record` parameter. For example:
bin/sa-importer --path /home/work/data --debug_exit_with_bad_record
3.3. Data Import Mode
The process of importing data includes starting a new import task and resuming an old import task.
- The import mode is indicated by the --import parameter. If this parameter is not included, the mode will be data validation as described in 3.2.
- In import mode, the --session <SessionFile> parameter must be explicitly specified, and if it is a new import task, set the value of SessionFile to "new".
3.3.1. Create a new import task
1. Place the data that needs to be validated in a machine where the Sensors Analytics system is deployed, and ensure that there are no other unrelated files in the directory, for example, the path is /home/work/data.
2. Run the batch import tool using "--import --session new", and the path must be specified using "--path".
bin/sa-importer --path /home/work/data --import --session new --project default
The SessionID for each session will be displayed when it starts or is interrupted with Ctrl+C:
When starting: Importer Session File: '2015-06-19-18-19-50.session'
When interrupted with Ctrl+C: Import session file: '2015-06-19-18-19-50.session', you can run the importer with the argument '-session 2015-06-19-18-19-50.session' to continue the import.
3.3.2. Restore import tasks
If a data import task is interrupted, it can be restored using the SessionFile:
1. For example, if an import task for /home/work/data was interrupted.
2. Run the batch import tool using "--import", "--session <SessionID>" must be specified, do not specify --path, and the --project must be the same.
bin/sa-importer --import --session 2015-06-19-18-19-50.session --project default
Note:
- If you want to restore an import task, there should be no file changes (modifications, additions, deletions) in the previous directory, otherwise the import will fail. If you want to add more data to import, place the data in another directory and use "New Import Task".
- In the command to restore the import task, --project must be the same as the project specified in the previous import task, otherwise the data from the restored task will be sent to a different project.
3.3.3. Interpretation of manifest file content
If the manifest parameter is used to specify the manifest file, a manifest file will be generated in the directory specified by the parameter value after the import process ends. This file contains basic statistics about the import and can be used for automation scripts as a "done" file (indicating the end of the import).
- If the file already exists in the path specified by the manifest parameter value when starting the import, the import will fail.
- Generating the manifest file is not necessary. The content of the file can be used for debugging and determining whether the import process is complete, as well as for basic validation of the import.
bin/sa-importer --path /home/work/data --session new --import --manifest info_2015_12_22
Example of the generated info_2015_12_22 file:
{ "session_id" : "1", // 导入的SessionID "do_import" : true, // 是否导入数据,false 为只校验数据 "complete" : true, // 是否运行成功,false 可能是人为或异常中断 "read_files" : [ "/home/work/data/a", "/home/work/data/b" ], // 实际读取的文件列表 "plan_files" : [ "/home/work/data/a", "/home/work/data/b" ], // 目录下应该读取的文件列表 "valid_count" : 209, // 有效数据条数 "total_count" : 209, // 总读取条数 "progress" : { "synced_source_progress" : { // 进度信息 "f" : "(dev=801,ino=1055397)", "o" : 32232, "n" : "b", "s" : 208, "c" : 208, "e" : "1" }, "sended_source_progress" : { // 进度信息 "f" : "(dev=801,ino=1055397)", "o" : 32232, "n" : "b", "s" : 208, "c" : 208, "e" : "1" }, "kafka_progress" : { // kafka进度信息 "0" : { "offset" : 22435, "partition_id" : 0, "update_timestamp" : 1450771040053 }, "1" : { "offset" : 22838, "partition_id" : 1, "update_timestamp" : 1450771045419 }, "2" : { "offset" : 23185, "partition_id" : 2, "update_timestamp" : 1450771040071 } }, "last_update_time" : 1450771042587, // 上次更新统计时间 "last_sync_time" : 1450771045419, // 上次写kafka时间 "status" : { "start_times" : 1, "this_time_start_running_time" : 1450771040213, // 启动导入时间 "sending_speed" : 0.0, "sending_records_in_store" : 0, "counter_filtered_by_expired_time" : 0, "counter_invalid_log_entry" : 0, "counter_invalid_reader_log_entry" : 0, "sent_to_kafka" : 209, "raw_read_count" : 209, "message_counter" : { "counter_map" : { } } } } }
4. Frequently Asked Questions
4.1. Getting the data that failed validation
The logs for the batch import tool are located in the directory of Sensors Analytics, usually in $SENSORS_ANALYTICS_LOG_DIR/batch_importer or $SENSORS_DATAFLOW_LOG_DIR/tools. If any data is filtered due to issues, it will be stored separately in the invalid_records directory of the aforementioned log directory.
4.2. Restoring interrupted import tasks
If the batch_importer import task is interrupted or the server connection is disconnected during import, and you need to continue the previous import task to send the remaining data to Sensors Analytics, you can refer to "3.3.2 Resuming Import Task" for operation.
Appendix I. Data Format
In addition, there is a dedicated page to introduce the data format. Please refer to Data Format.
The file to be imported is a JSON format with one line per record:
{"type":"track","properties":{"propertie1":"value1","propertie2":"value2"},"event":"EventName","distinct_id":"DistinctId","original_id":"OriginalId","time":1431353191267}
Property Introduction:
Property Name | Requirement | Meaning |
---|---|---|
Type | Mandatory field, the value can only be as shown in the table below | The type of this record |
Properties | Mandatory field, k-v pairs in JSON Note: The type of each property needs to remain the same from start to finish. For example, it cannot change from NUMBER to STRING. | Properties associated with Event or Profile |
event | Mandatory field for type track, not required for type profile. String | The name of the Event |
distinct_id | Required field. String | User's fixed and unique identifier |
original_id | Required field for type track_signup, not used for other types. String | Random anonymous ID before registration |
time | Required field. Unix timestamp, accurate to milliseconds! | The time corresponding to this record |
type: The type of this data, can be one of the following types:
Type | Explanation |
---|---|
track | An event and associated properties |
track_signup | Track the user's registration behavior, this API is similar to track, except for the additional original_id parameter. |
profile_set | Directly set a user's profile, override if the user or profile already exists, create automatically if not. |
profile_append | Append a List-type profile of a user, create automatically if the user or profile does not exist. |
profile_increment | Add or subtract a Numeric type Profile of a user. If the user does not exist, it will be automatically created. If the Profile does not exist, it will default to 0. |
profile_delete | Delete a user. |
profile_unset | Delete a specific Profile of a user. |
item_set | Set an Item directly. If the field of the Item already exists, it will be overwritten. If it does not exist, it will be automatically created. |
item_delete | Delete the entire content of the Item. |
Note: The content of this document is a technical document that provides details on how to use the Sensors product and does not include sales terms; the specific content of enterprise procurement products and technical services shall be subject to the commercial procurement contract.