应用场景
客户的数据被写成了神策的 JSON 格式(数据格式)被放到了神策的 HDFS 文件 / 本地文件上。希望使用工具将数据导入神策的实体中,则使用本工具写入数据。
导入步骤
- 切换到 sa_cluster 账户
sudo su - sa_cluster
- 将数据置于神策分析集群任意一台机器上,例如 /home/sa_cluster 目录,其中包含文件 data01、data02、data03(命名可以随意)等文件(支持 txt、csv、log 等类型文件)。
- 文件内容为每行一个符合 数据格式 的 json,且需要保证 sa_cluster 账户对该目录文件有读写权限。
创建 HDFS 目录、调整权限、放置数据到神策分析集群 HDFS 上,比如 /data/input 目录
hdfs dfs -put /home/sa_cluster/data01 /data/input
执行导入命令 (当前为 attach 模式,需要一直等待才行)
integratoradmin importer run --job_name custome_batch_load_01 --path hdfs:///data/input
- 查看任务运行情况
integratoradmin importer list --job_name custome_batch_load_01
参数及输出详解
integratoradmin importer run 输入参数
参数 | 是否必传 | 描述 | 值 |
---|---|---|---|
--path | path / session 必传一个 | 导入文件位置 | file:///home/sa_cluster/xiao/test hdfs:///sa/runtime/xiao/test 格式为一行一个 json |
--job_name | 否 | 自定义导入任务名 | 用户任务名称,$integrator_import_batch_name 会用到, 不传用 session name 替代 |
--project | 否 | 项目名,传了会查出来并赋值给 id | 默认 default |
--project_id | 否 | 项目 id | 1 |
--debug | 否 | 调试模式 | true |
--parallelism | 否 | flink_job parallelism | 1 |
--mode | 否 | flink_job 运行模式 | 默认会自行判断 local / cluster |
--expired_record_filter_after_hour | 否 | ExpiredHandler 内将时间不符合要求的数据过滤 | 默认过滤未来 1 小时后,过去 2 年以前的数据(实际1年后就会被拦截), 1 / 24*365*2 |
--expired_record_filter_before_hour | 否 | ||
--enable_first_time_processor | 否 | 开启首日首次处理 | 默认 true |
--mem_mb | 否 | local 模式启动 run_importer_by_java 传的内存 | 默认 2048 |
--ytm | 否 | yarn task manager 内存 | 默认 2048 |
--yjm | 否 | yarn job manager 内存 | 默认 1024 |
--custom_processor_jar_path | 否 | 自定义预处理程序 jar 所在路径 | |
--custom_processor | 否 | 自定义预处理程序类名 | 如:com.sensorsdata.analytics.csvtojson.CSVToJSON |
--custom_processor_config | 否 | 自定义预处理程序的配置参数 |
integratoradmin importer list 输入输出
只会从库里查近 10000 条,之前的查不到
输入:
参数 | 是否必传 | 描述 | 值 |
---|---|---|---|
--project | 是 | 项目名,传了会查出来并赋值给 id | default |
--project_id | 项目 id | 1 | |
--job_name | 否,都是过滤条件,不传会查很多出来 | 任务名称 | |
--status | 任务状态 | ||
--start_time | 任务开始时间, '2022-01-01T01:00:00 | ||
--limit | 显示的任务数量 | 10 |
输出:
参数 | 描述 | 值的示例 |
---|---|---|
id | 任务id | 11 |
session_id | session id | 1 |
import_path | 导入路径 | "hdfs:///sa/runtime/xwc_test/import_event_1694610302531" |
log_path | 日志路径 | $SENSORS_DATA_INTEGRATOR_LOG_DIR/importer/flink-644db427-5dd5-4f90-9457-ad5aa7942a9e-client-hybrid01.log |
job_name | 任务名称 | custome_batch_load_01 |
start_time | 任务开始时间 | 2023-09-13 21:26:42 |
end_time | 任务结束时间 | 2023-09-13 21:30:24 |
counter_info | 各种类型的数据的counter值 | "event_import_count" : 200000, "all_read_count" : 200000, "all_import_count" : 200000, "all_skipped_count" : 0, "profile_read_count" : 0, "profile_import_count" : 0, "profile_skipped_count" : 0, "item_read_count" : 0, "item_import_count" : 0, "item_skipped_count" : 0, "detail_read_count" : 0, "detail_import_count" : 0, "detail_skipped_count" : 0 |
parameters | 调用任务时的参数输入 | "id" : -1, "job_name": "custome_batch_load_01", |
project_id | 项目id | 3 |
batch_process_stage_id | 批处理的id | importer_11_batch_process |
batch_load_to_parquet_stage_id | 转 parquet 的id | |
batch_load_to_sdw_stage_id | 批导入的id | importer_11_batch_load_to_sdw |
remapping_stage_id | remapping 的id | importer_11_remapping_send |
supply_data_stage_id | 补数据的id | importer_11_supply_data |
batch_process_stage_status | 批处理的状态 | success / fail / null |
batch_load_to_parquet_stage_status | 转 parquet 的状态 | success / fail / null |
batch_load_to_sdw_stage_status | 批导入状态 | success / fail / null |
remapping_stage_status | remapping 状态 | success / fail / null |
supply_data_stage_status | 补数据状态 | success / fail / null |
run_mode | 运行模式 | cluster |
last_heartbeat_time | 最后一次 heartbeat 时间 | 1694611822000 |
status | importer 的总状态 | success / fail / null |