If the SDF is not installed, you can enter the /home/sa_cluster/sp/tools/batch_importer path and use bin/sa-importer,

If SDF is available, you can use bin/sa-importer in the $SENSORS_DATAFLOW_HOME/tools/batch_importer/ path.

If you're not sure whether SDF is installed in your environment, please consult your data consultant for one-on-one assistance.

Before using it, please read the data modeland the data formatintroduction.

1. Overview

The batch import tool is used to import historical or external data into SensData for use. For real-time data import, please use  LogAgent.

The data imported using the batch import tool needs to conform to the data format. The last section of this article also provides a brief format introduction.

2. Usage

2.1. Operating Environment

The batch import tool can only be used on a single machine or cluster machine where SensData is deployed.

2.2. Usage Steps

This section introduces the most common usage methods of BatchImporter. For other functions, please refer to "3. Detailed Explanation of Tool Running Parameters".

The steps are as follows:

  1. Place the data in a certain path, for example, if the data to be imported is under /home/work/data, there are several files named part-001, part-002, part-003 (the naming can be arbitrary) (supports txt, csv, log and other types of files, and each line is a Json that conforms to the data format).
  2. Switch to the sa_cluster account.

    sudo su - sa_cluster
    CODE
  3. Find batch_importer.

    # 可直接使用 batch_importer batch_importer --help # 如果提示上面的命令不存在, 则进入下面的目录使用 bin/sa-importer,如果有 sdf 那就是/home/sa_cluster/sdf/tools 这个路径下 cd /home/sa_cluster/sp/tools/batch_importer bin/sa-importer --help
    CODE
  4. Run the batch import tool, check data correctness, but do not import actual data.
    Note: path points to the folder where the data is located, not the file, and all files in that folder will be read by the batch import tool.

    bin/sa-importer --path /home/work/data 或 batch_importer --path /home/work/data
    CODE



  5. After running, statistical information will be displayed.

    Import session read 32 valid records, 0 lines can't be parsed, 0 records invalid.
    CODE

    Here it is explained that there are 32 valid data, 0 data that cannot be parsed, and 0 data that can be parsed but is invalid. If there is any invalid data, it will be reflected in the log.

  6. After checking the data in step 4 and there are no issues, proceed with the actual data import.

    bin/sa-importer --path /home/work/data --import --session new --project default 或 batch_importer --path /home/work/data --import --session new --project default
    CODE


    When the following information appears, it indicates the end of the import.

    Send 32 records to kafka Import /home/work/data completed.
    CODE

2.3. Notes

  • There are two methods to specify the project for data import:
    • Add a 'project' field to the data (seedata format). In this way, the data of multiple projects can be imported at once;
    • When starting the import, add the `--project` parameter. All data, whether the `project` field is specified or not, will be imported into the project specified by the parameter.
  • sa_cluster needs access permissions to the data directory and files. You can switch to sa_cluster and tail the data files to see if they can be opened.
  • After importing, cleaning up the data is complex, so please check before performing operations. Running the import multiple times with the same data will cause data duplication.
  • The batch import tool specifies the directory to be imported using the `--path` parameter, and imports all files in the directory. Please do not add, delete, or modify files in the directory after starting the import, otherwise the import results may not meet expectations.
  • The batch import tool reads the files in the specified folder in lexicographic order of the file names.
  • If SensorsAnalytics has a running real-time data stream, please set a speed limit to avoid affecting real-time data. The method to set the speed limit is to add the parameter `--speed_limit <limit>`, for example, `--speed_limit 300`.
  • It does not support correcting first-day-first-time, it is recommended not to use this tool to import data containing first-day-first-time.

3. Detailed explanation of tool running parameters

3.1. Invocation parameters

Execute sa-importer in the deployment path or other path of the batch import tool.

sa-importer [参数]
CODE

Parameter Introduction:

Parameter

Description

--helpPrint usage instructions
--type_defineData type definition in properties
--importWhether to execute import. If this parameter is not configured, the operation will only perform validation.
--speed_limitImport speed limit in records/second. The default is unlimited.
--sampleOutput some example data.
--pathThe path of the data to be imported.
--sessionThe session file for the current import. If it is a new import task, set it to "new".
--parserDefault is Json, please avoid using this parameter in general.
--manifestAfter the batch import tool finishes running, use the parameter value as the path to output the manifest file. If the path already has a file when started, the startup will fail.
--projectImporting project data. Please note that if this parameter is specified, the project field in the data will be invalid and all data will be imported into the project specified by --project.
--expired_record_filter_after_hourSpecifies the number of hours in the future to filter data during import. Default value is 1, which means data more than 1 hour in the future will be filtered out.
--expired_record_filter_before_hourSpecifies the number of hours in the past to filter data during import. Default value is 17520, which means data more than 2 years in the past will be filtered out.


The batch import tool has two running modes: Data Validation Mode and Data Import Mode.

3.2. Data Validation Mode

Since data import is a complex process, we recommend users to validate the data before importing, mainly to check if the data complies with the basic format requirements (as described in the overview).

The process of using the batch import tool for data validation is as follows:

  1. Place the data to be validated in the directory of a machine where Sensors Analytics is deployed. The directory should not contain any other irrelevant files that do not require validation, for example, the path is `/home/work/data`.
  2. Run the batch import tool:
bin/sa-importer --path /home/work/data
CODE

At the end of the run, the statistics will be displayed:

Import session read 33128 valid records, 3 lines can't be parsed, 2 records invalid.
CODE

33128 records are valid, 3 records cannot be parsed, and 2 records have issues.

If you want BatchImporter to stop immediately upon encountering error data, you can add the `--debug_exit_with_bad_record` parameter. For example:

bin/sa-importer --path /home/work/data --debug_exit_with_bad_record
CODE


3.3. Data Import Mode

The process of importing data includes starting a new import task and resuming an old import task.

  • The import mode is indicated by the --import parameter. If this parameter is not included, the mode will be data validation as described in 3.2.
  • In import mode, the --session <SessionFile> parameter must be explicitly specified, and if it is a new import task, set the value of SessionFile to "new".

3.3.1. Create a new import task

1. Place the data that needs to be validated in a machine where the Sensors Analytics system is deployed, and ensure that there are no other unrelated files in the directory, for example, the path is /home/work/data.
2. Run the batch import tool using "--import --session new", and the path must be specified using "--path".

bin/sa-importer --path /home/work/data --import --session new --project default
CODE

The SessionID for each session will be displayed when it starts or is interrupted with Ctrl+C:

When starting: Importer Session File: '2015-06-19-18-19-50.session'

When interrupted with Ctrl+C: Import session file: '2015-06-19-18-19-50.session', you can run the importer with the argument '-session 2015-06-19-18-19-50.session' to continue the import.

3.3.2. Restore import tasks

If a data import task is interrupted, it can be restored using the SessionFile:

1. For example, if an import task for /home/work/data was interrupted.
2. Run the batch import tool using "--import", "--session <SessionID>" must be specified, do not specify --path, and the --project must be the same.

bin/sa-importer --import --session 2015-06-19-18-19-50.session --project default
CODE

Note:

  • If you want to restore an import task, there should be no file changes (modifications, additions, deletions) in the previous directory, otherwise the import will fail. If you want to add more data to import, place the data in another directory and use "New Import Task".
  • In the command to restore the import task, --project must be the same as the project specified in the previous import task, otherwise the data from the restored task will be sent to a different project.

3.3.3. Interpretation of manifest file content

If the manifest parameter is used to specify the manifest file, a manifest file will be generated in the directory specified by the parameter value after the import process ends. This file contains basic statistics about the import and can be used for automation scripts as a "done" file (indicating the end of the import).

  • If the file already exists in the path specified by the manifest parameter value when starting the import, the import will fail.
  • Generating the manifest file is not necessary. The content of the file can be used for debugging and determining whether the import process is complete, as well as for basic validation of the import.
bin/sa-importer --path /home/work/data --session new --import --manifest info_2015_12_22
CODE

Example of the generated info_2015_12_22 file:

{   "session_id" : "1",  // 导入的SessionID   "do_import" : true,  // 是否导入数据,false 为只校验数据   "complete" : true,   // 是否运行成功,false 可能是人为或异常中断   "read_files" : [ "/home/work/data/a", "/home/work/data/b" ],  // 实际读取的文件列表   "plan_files" : [ "/home/work/data/a", "/home/work/data/b" ],  // 目录下应该读取的文件列表   "valid_count" : 209,  // 有效数据条数   "total_count" : 209,  // 总读取条数   "progress" : {     "synced_source_progress" : {   // 进度信息       "f" : "(dev=801,ino=1055397)",       "o" : 32232,       "n" : "b",       "s" : 208,       "c" : 208,       "e" : "1"     },     "sended_source_progress" : {   // 进度信息       "f" : "(dev=801,ino=1055397)",       "o" : 32232,       "n" : "b",       "s" : 208,       "c" : 208,       "e" : "1"     },     "kafka_progress" : {   // kafka进度信息       "0" : {         "offset" : 22435,         "partition_id" : 0,         "update_timestamp" : 1450771040053       },       "1" : {         "offset" : 22838,         "partition_id" : 1,         "update_timestamp" : 1450771045419       },       "2" : {         "offset" : 23185,         "partition_id" : 2,         "update_timestamp" : 1450771040071       }     },     "last_update_time" : 1450771042587,  // 上次更新统计时间     "last_sync_time" : 1450771045419,    // 上次写kafka时间     "status" : {       "start_times" : 1,       "this_time_start_running_time" : 1450771040213,  // 启动导入时间       "sending_speed" : 0.0,       "sending_records_in_store" : 0,       "counter_filtered_by_expired_time" : 0,       "counter_invalid_log_entry" : 0,       "counter_invalid_reader_log_entry" : 0,       "sent_to_kafka" : 209,       "raw_read_count" : 209,       "message_counter" : {         "counter_map" : { }       }     }   } }
CODE


4. Frequently Asked Questions

4.1. Getting the data that failed validation

The logs for the batch import tool are located in the directory of Sensors Analytics, usually in  $SENSORS_ANALYTICS_LOG_DIR/batch_importer or $SENSORS_DATAFLOW_LOG_DIR/tools. If any data is filtered due to issues, it will be stored separately in the invalid_records directory of the aforementioned log directory.

4.2. Restoring interrupted import tasks

If the batch_importer import task is interrupted or the server connection is disconnected during import, and you need to continue the previous import task to send the remaining data to Sensors Analytics, you can refer to "3.3.2 Resuming Import Task" for operation.


Appendix I. Data Format

In addition, there is a dedicated page to introduce the data format. Please refer to  Data Format.

The file to be imported is a JSON format with one line per record:

{"type":"track","properties":{"propertie1":"value1","propertie2":"value2"},"event":"EventName","distinct_id":"DistinctId","original_id":"OriginalId","time":1431353191267}
CODE

Property Introduction:

Property Name

Requirement

Meaning

TypeMandatory field, the value can only be as shown in the table belowThe type of this record
Properties

Mandatory field, k-v pairs in JSON

Note: The type of each property needs to remain the same from start to finish. For example, it cannot change from NUMBER to STRING.

Properties associated with Event or Profile
eventMandatory field for type track, not required for type profile. StringThe name of the Event
distinct_idRequired field. StringUser's fixed and unique identifier
original_idRequired field for type track_signup, not used for other types. StringRandom anonymous ID before registration
timeRequired field. Unix timestamp, accurate to milliseconds!The time corresponding to this record

type: The type of this data, can be one of the following types:

Type

Explanation

trackAn event and associated properties
track_signupTrack the user's registration behavior, this API is similar to track, except for the additional original_id parameter.
profile_setDirectly set a user's profile, override if the user or profile already exists, create automatically if not.
profile_appendAppend a List-type profile of a user, create automatically if the user or profile does not exist.
profile_incrementAdd or subtract a Numeric type Profile of a user. If the user does not exist, it will be automatically created. If the Profile does not exist, it will default to 0.
profile_deleteDelete a user.
profile_unset

Delete a specific Profile of a user.

item_setSet an Item directly. If the field of the Item already exists, it will be overwritten. If it does not exist, it will be automatically created.
item_deleteDelete the entire content of the Item.