菜单

Integrator Importer

应用场景

客户的数据被写成了神策的 JSON 格式(数据格式)被放到了神策的 HDFS 文件 / 本地文件上。希望使用工具将数据导入神策的实体中,则使用本工具写入数据。

导入步骤

  1. 切换到 sa_cluster 账户
sudo su - sa_cluster
  1. 将数据置于神策分析集群任意一台机器上,例如 /home/sa_cluster 目录,其中包含文件 data01、data02、data03(命名可以随意)等文件(支持 txt、csv、log 等类型文件)。
  2. 文件内容为每行一个符合 数据格式 的 JSON,且需要保证 sa_cluster 账户对该目录文件有读写权限。
  3. 创建 HDFS 目录、调整权限、放置数据到神策分析集群 HDFS 上,比如 /data/input 目录。
hdfs dfs -put /home/sa_cluster/data01 /data/input
  1. 执行导入命令(当前为 attach 模式,需要一直等待才行)。
integratoradmin importer run --job_name custome_batch_load_01 --path hdfs:///data/input
  1. 查看任务运行情况
integratoradmin importer list --job_name custome_batch_load_01

参数及输出详解

integratoradmin importer run 输入参数

参数 是否必传 描述
--path path session 必传一个 导入文件位置 file:///home/sa_cluster/xiao/test
hdfs:///sa/runtime/xiao/test
格式为一行一个 JSON
--job_name 自定义导入任务名 用户任务名称,$integrator_import_batch_name 会用到,不传则使用 session name 替代
--project 项目名,传入后会查出并赋值给 ID 默认 default
--project_id 项目 ID 1
--debug 调试模式 true
--parallelism flink_job parallelism 1
--mode flink_job 运行模式 默认会自行判断 localcluster
--expired_record_filter_after_hour ExpiredHandler 内将时间不符合要求的数据过滤 默认过滤未来 1 小时后,过去 2 年以前的数据(实际 1 年后就会被拦截),1 / 24 * 365 * 2
--expired_record_filter_before_hour
--enable_first_time_processor 开启首日首次处理 默认 true
--mem_mb local 模式启动
run_importer_by_java 传的内存
默认 2048
--ytm yarn task manager 内存 默认 2048
--yjm yarn job manager 内存 默认 1024
--custom_processor_jar_path 自定义预处理程序 JAR 所在路径  
--custom_processor 自定义预处理程序类名 如:com.sensorsdata.analytics.csvtojson.CSVToJSON
--custom_processor_config 自定义预处理程序的配置参数  

integratoradmin importer list 输入输出

只会从库里查近 10000 条,之前的查不到

输入:

参数 是否必传 描述
--project 项目名,传了会查出来并赋值给 id default
--project_id 项目 id 1
--job_name 否,都是过滤条件,不传会查很多出来 任务名称  
--status 任务状态  
--start_time 任务开始时间, '2022-01-01T01:00:00  
--limit 显示的任务数量 10

输出:

参数 描述 值的示例
id 任务 id 11
session_id session id 1
import_path 导入路径 "hdfs:///sa/runtime/xwc_test/import_event_1694610302531"
log_path 日志路径 $SENSORS_DATA_INTEGRATOR_LOG_DIR/importer/flink-644db427-5dd5-4f90-9457-ad5aa7942a9e-client-hybrid01.log
job_name 任务名称 custome_batch_load_01
start_time 任务开始时间 2023-09-13 21:26:42
end_time 任务结束时间 2023-09-13 21:30:24
counter_info 各种类型的数据的 counter 值 "event_import_count": 200000,
"all_read_count": 200000,
"all_import_count": 200000,
"all_skipped_count": 0,
"profile_read_count": 0,
"profile_import_count": 0,
"profile_skipped_count": 0,
"item_read_count": 0,
"item_import_count": 0,
"item_skipped_count": 0,
"detail_read_count": 0,
"detail_import_count": 0,
"detail_skipped_count": 0
parameters 调用任务时的参数输入 "id": -1,
"path": "hdfs:///sa/runtime/xwc_test/import_event_1694610302531",
"session_id": -1,
"project": "xwc",
"job_name": "custome_batch_load_01",
"project_id": 3,
"debug": false,
"parallelism": 5,
"mode": "cluster",
"enable_first_time_processor": true,
"expired_record_filter_after_hour": 1,
"expired_record_filter_before_hour": 17520,
"enable_id_mapping": false,
"log_file_name": "$SENSORS_DATA_INTEGRATOR_LOG_DIR/importer/flink-644db427-5dd5-4f90-9457-ad5aa7942a9e-client-hybrid01.log",
"re_group_user_data": false
project_id 项目 id 3
batch_process_stage_id 批处理的 id importer_11_batch_process
batch_load_to_parquet_stage_id 转 parquet 的 id  
batch_load_to_sdw_stage_id 批导入的 id importer_11_batch_load_to_sdw
remapping_stage_id remapping 的 id importer_11_remapping_send
supply_data_stage_id 补数据的 id importer_11_supply_data
batch_process_stage_status 批处理的状态 success / fail / null
batch_load_to_parquet_stage_status 转 parquet 的状态 success / fail / null
batch_load_to_sdw_stage_status 批导入状态 success / fail / null
remapping_stage_status remapping 状态 success / fail / null
supply_data_stage_status 补数据状态 success / fail / null
run_mode 运行模式 cluster
last_heartbeat_time 最后一次 heartbeat 时间 1694611822000
status importer 的总状态 success / fail / null
上一个
FormatImporter
下一个
LogAgent
最近修改: 2025-03-19