未安装 SDF 可进入 /home/sa_cluster/sp/tools/batch_importer 这个路径下使用 bin/sa-importer,
如果有 SDF 可在 $SENSORS_DATAFLOW_HOME/tools/batch_importer/ 这个路径下使用 bin/sa-importer
如果不确认您的环境是否安装了 SDF,请咨询您的数据咨询顾问获取一对一的协助。
概述
批量导入工具用于将历史数据或外部数据导入神策分析进行使用。实时数据导入请使用 LogAgent。
使用批量导入工具导入的数据需要符合数据格式,本文最后附录也有简单格式介绍。
使用方法
运行环境
批量导入工具只能在部署神策分析的单机或集群机器上使用。
使用步骤
本节介绍 BatchImporter 最常见的使用方法,其他功能请参考“3.工具运行参数详解”。
步骤如下:
- 将数据置于某个路径,比如需要导入的数据在 /home/work/data 下,有 part-001、part-002、part-003(命名可以随意)几个文件(支持 txt、csv、log 等类型文件,且每行一个符合数据格式的 Json )。
切换到 sa_cluster 账户。
sudo su - sa_cluster
找到 batch_importer 。
# 可直接使用 batch_importer batch_importer --help # 如果提示上面的命令不存在, 则进入下面的目录使用 bin/sa-importer,如果有 sdf 那就是/home/sa_cluster/sdf/tools 这个路径下 cd /home/sa_cluster/sp/tools/batch_importer bin/sa-importer --help
运行批量导入工具,检查数据正确性,但不进行真的数据导入。
注意:path 指向的是数据所在文件夹而不是文件,该文件夹下的所有文件都会被批量导入工具读取。bin/sa-importer --path /home/work/data 或 batch_importer --path /home/work/data
运行后会显示统计信息,
Import session read 32 valid records, 0 lines can't be parsed, 0 records invalid.
这里说明有 32 条有效数据,0 条数据不可解析,0 条可解析但数据无效。若有无效数据,将会在日志里体现。
经过步骤 4 检查数据都没问题以后,进行真正的数据导入。
bin/sa-importer --path /home/work/data --import --session new --project default 或 batch_importer --path /home/work/data --import --session new --project default
当出现如下信息时说明导入结束。
Send 32 records to kafka Import /home/work/data completed.
注意事项
- 指定数据导入的项目有两种方法:
- 在数据中添加 `project` 字段(详见数据格式),使用这种方式可以一次导入多个项目的数据;
- 启动导入时,添加 `--project` 参数。所有数据无论是否指定 `project` 字段都将导入到参数设置的项目中。
- sa_cluster 需要有数据目录和文件的访问权限,可以切换到 sa_cluster 后 tail 一下数据文件看是否能打开。
- 导入后清理数据较复杂,请检查好再操作。对同一份数据多次运行导入会导致数据重复。
- 批量导入工具通过 --path 参数指定要导入的目录,并导入目录下所有的文件。请在启动导入后不要增删、修改目录下的文件,否则无法保证导入结果符合预期。
- 批量导入工具读取文件的顺序是按照指定文件夹中文件名的字典序。
- 如果 SensorsAnalytics 有正在运行的实时数据流,请设置限速以免影响实时数据,设置的方法是加参数 `--speed_limit
` ,例如 `--speed_limit 300` 。 - 不支持修正首日首次,建议不要使用该工具来导入包含有首日首次的数据。
工具运行参数详解
调用参数
在批量导入工具的部署路径或其他路径下执行 sa-importer 。
sa-importer [参数]
参数简介:
参数 | 说明 |
---|---|
--help | 打印使用说明 |
--type_define | properties 中各字段类型限定 |
--import | 是否执行导入,若不配置该参数,运行只做校验 |
--speed_limit | 导入速度限制,单位是 条/秒,默认为不限速 |
--sample | 输出一些样例数据。 |
--path | 需要导入数据的路径 |
--session | 本次运行导入的 Session 文件。如果是一次新的导入任务,请设置为 new |
--parser | 默认 Json ,一般请勿使用该参数 |
--manifest | 批量导入工具运行结束后,使用参数值作为路径输出 manifest 文件。若启动时该路径已经有文件,那么启动失败。 |
--project | 将数据导入的项目。请注意,若指定该参数,数据中的 project 字段将无效,本次运行的所有数据都将导入到 --project 指定的项目。 |
--expired_record_filter_after_hour | 允许导入的数据时间区间截至未来的小时数,默认为 1,即超过未来 1 小时的数据将被过滤。 |
--expired_record_filter_before_hour | 允许导入的数据时间区间向前的小时数,默认为 17520,即超过 2 年以前的数据将被过滤。 |
批量导入工具两种运行模式为:校验数据模式 和 导入数据模式。
校验数据模式
由于数据导入是一个复杂的过程,所以我们希望导入数据前用户可以对数据先进行简单的校验工作,主要是校验数据是否符合最基本的规范(见概述中数据格式的描述)。
批量导入工具校验功能的使用流程如下:
- 将需要检验的数据放入部署神策分析系统的某台机器中,该目录中不要包含其他不需要检验的无关文件,例如路径是 `/home/work/data` 。
- 运行批量导入工具:
bin/sa-importer --path /home/work/data
运行结束会显示统计:
Import session read 33128 valid records, 3 lines can't be parsed, 2 records invalid.
表示有 33128 条数据符合格式,3 条数据无法解析,2 条数据有问题。
若希望 BatchImporter 遇到错误数据立刻停止运行,可以添加参数 `–debug_exit_with_bad_record`。例如:
bin/sa-importer --path /home/work/data --debug_exit_with_bad_record
导入数据模式
导入数据的过程分为启动一个新的导入任务和恢复旧的导入任务。
- 导入数据模式的标志是 --import 参数,如果不加该参数则运行模式为 3.2 中介绍的校验数据。
- 导入数据模式必须使用 --session
参数显式的指定 SessionFile ,如果是一次新导入任务,请设置 SessionFile 的值为 new。
新建导入任务
1. 将需要检验的数据放入部署神策分析系统的某台机器中,该目录中不要包含其他不需要检验的无关文件,例如路径是 /home/work/data 。
2. 运行批量导入工具:使用 --import --session new ,必须指定路径 --path 。
bin/sa-importer --path /home/work/data --import --session new --project default
刚开始运行或 Ctrl+C 中断都会显示本次的 SessionID :
启动时:Importer Session File: '2015-06-19-18-19-50.session'
Ctrl+C 退出时:Import session file: '2015-06-19-18-19-50.session', you can run importer with arg '-session 2015-06-19-18-19-50.session' to continue import.
恢复导入任务
如果某次数据导入任务被中止,使用 SessionFile 可以进行该任务的恢复:
1. 例如某次导入 /home/work/data 的导入任务被中止。
2. 运行批量导入工具:使用 --import,必须指定 --session
bin/sa-importer --import --session 2015-06-19-18-19-50.session --project default
注意:
- 如果要恢复导入任务,那么之前使用的目录下不能有任何文件变动(修改内容,添加文件,删除文件),否则将启动失败。如需追加导入内容,请将数据放到其他目录,并使用“新建导入任务”。
- 恢复导入任务命令中 --project 与之前导入任务指定的 project 要相同,不然恢复任务的数据会发送到不同的项目里。
manifest 文件内容解读
若使用了 manifest 参数指定了 manifest 文件,导入运行结束后会在参数值目录生成 manifest 文件,该文件包含导入的基本统计信息,可用于自动化脚本,作为 done 文件(导入结束的标志)。
- 若启动导入时 manifest 参数值所指路径文件已经存在,那么导入启动会失败。
- 生成 manifest 不是必须的,该文件内容可用于调试和判断导入过程是否结束,并简单校验导入正确性。
bin/sa-importer --path /home/work/data --session new --import --manifest info_2015_12_22
生成的 info_2015_12_22 文件样例:
{
"session_id" : "1", // 导入的SessionID
"do_import" : true, // 是否导入数据,false 为只校验数据
"complete" : true, // 是否运行成功,false 可能是人为或异常中断
"read_files" : [ "/home/work/data/a", "/home/work/data/b" ], // 实际读取的文件列表
"plan_files" : [ "/home/work/data/a", "/home/work/data/b" ], // 目录下应该读取的文件列表
"valid_count" : 209, // 有效数据条数
"total_count" : 209, // 总读取条数
"progress" : {
"synced_source_progress" : { // 进度信息
"f" : "(dev=801,ino=1055397)",
"o" : 32232,
"n" : "b",
"s" : 208,
"c" : 208,
"e" : "1"
},
"sended_source_progress" : { // 进度信息
"f" : "(dev=801,ino=1055397)",
"o" : 32232,
"n" : "b",
"s" : 208,
"c" : 208,
"e" : "1"
},
"kafka_progress" : { // kafka进度信息
"0" : {
"offset" : 22435,
"partition_id" : 0,
"update_timestamp" : 1450771040053
},
"1" : {
"offset" : 22838,
"partition_id" : 1,
"update_timestamp" : 1450771045419
},
"2" : {
"offset" : 23185,
"partition_id" : 2,
"update_timestamp" : 1450771040071
}
},
"last_update_time" : 1450771042587, // 上次更新统计时间
"last_sync_time" : 1450771045419, // 上次写kafka时间
"status" : {
"start_times" : 1,
"this_time_start_running_time" : 1450771040213, // 启动导入时间
"sending_speed" : 0.0,
"sending_records_in_store" : 0,
"counter_filtered_by_expired_time" : 0,
"counter_invalid_log_entry" : 0,
"counter_invalid_reader_log_entry" : 0,
"sent_to_kafka" : 209,
"raw_read_count" : 209,
"message_counter" : {
"counter_map" : { }
}
}
}
}
常见问题
获取校验失败的数据
批量导入工具的日志在神策分析的目录下,一般是在 $SENSORS_ANALYTICS_LOG_DIR/batch_importer 或 $SENSORS_DATAFLOW_LOG_DIR/tools。由于数据有问题被过滤的数据将额外存储在上述日志目录的 invalid_records 中。
恢复中断的导入任务
将 batch_importer 导入任务中断,或执行导入中间出现服务器连接断开,现在需要继续之前的导入任务来将剩下的数据发送到神策,则可以参考“3.3.2 恢复导入任务”来操作。
附录 I. 数据格式
另外有专门页面介绍数据格式,请参考 数据格式
需要导入的文件每行为一条如下格式的 JSON :
{"type":"track","properties":{"propertie1":"value1","propertie2":"value2"},"event":"EventName","distinct_id":"DistinctId","original_id":"OriginalId","time":1431353191267}
属性简介:
属性名 | 要求 | 含义 |
---|---|---|
type | 必须字段,值只能是下表中给出的 | 这条记录的类型 |
properties | 必须字段,JSON 的 k-v 对 注意:每个 property 的类型需要保证从始至终都是同一个。如一开始为 NUMBER ,之后不能变为 STRING | Event 或 Profile 关联的属性 |
event | type 为 track 类时为必须字段,profile 类时不需设置。字符串 | Event 的名字 |
distinct_id | 必须字段。字符串 | 用户的固定且唯一标识 |
original_id | type 为 track_signup 时为必须字段,其他 type 该字段没有用。字符串 | 注册之前的随机匿名ID |
time | 必须字段。unix 时间戳,精确到毫秒! | 这条记录对应的时间 |
type:这条数据的类型,可以是如下类型中的一种
Type | 解释 |
---|---|
track | 一个 Event 及关联的 Properties |
track_signup | 跟踪用户的注册行为,该接口与 track 基本相同,除了多了一个 original_id 参数。 |
profile_set | 直接设置一个用户的 Profile,如果用户或者 Profile 已存在则覆盖,不存在则自动创建。 |
profile_append | 追加一个用户的某个 List 类型的 Profile,如果用户或者 Profile 不存在则自动创建。 |
profile_increment | 增加或减少一个用户的某个 Numeric 类型的 Profile,如果用户不存在则自动创建, Profile 不存在则默认为 0。 |
profile_delete | 删除一个用户。 |
profile_unset | 删除一个用户的某个特定的 Profile。 |
item_set | 直接设置一个 Item,如果 Item 的字段已存在,则覆盖,不存在则自动创建。 |
item_delete | 删除整个 Item 内容。 |