应用场景

客户的数据被写成了神策的 JSON 格式(数据格式)被放到了神策的 HDFS 文件 / 本地文件上。希望使用工具将数据导入神策的实体中,则使用本工具写入数据。

导入步骤

  • 切换到 sa_cluster 账户
sudo su - sa_cluster
CODE
  • 将数据置于神策分析集群任意一台机器上,例如 /home/sa_cluster 目录,其中包含文件 data01、data02、data03(命名可以随意)等文件(支持 txt、csv、log 等类型文件)。
  • 文件内容为每行一个符合 数据格式 的 json,且需要保证 sa_cluster 账户对该目录文件有读写权限。
  • 创建 HDFS 目录、调整权限、放置数据到神策分析集群 HDFS 上,比如 /data/input 目录

hdfs dfs -put /home/sa_cluster/data01 /data/input
CODE
  • 执行导入命令   (当前为 attach 模式,需要一直等待才行)

integratoradmin importer run --job_name custome_batch_load_01 --path hdfs:///data/input
CODE


  • 查看任务运行情况
integratoradmin importer list --job_name custome_batch_load_01
CODE

参数及输出详解

integratoradmin importer run 输入参数

参数是否必传描述

 --path

path / session 必传一个

导入文件位置

file:///home/sa_cluster/xiao/test

hdfs:///sa/runtime/xiao/test

格式为一行一个 json

--job_name

自定义导入任务名

用户任务名称,$integrator_import_batch_name 会用到, 不传用 session name 替代

--project

项目名,传了会查出来并赋值给 id

默认 default

--project_id

项目 id

1

--debug

调试模式

true

--parallelism

flink_job parallelism

1

--mode

flink_job 运行模式

默认会自行判断 local / cluster

--expired_record_filter_after_hour

ExpiredHandler 内将时间不符合要求的数据过滤

默认过滤未来 1 小时后,过去 2 年以前的数据(实际1年后就会被拦截), 1 / 24*365*2

--expired_record_filter_before_hour

--enable_first_time_processor

开启首日首次处理

默认 true

--mem_mb

local 模式启动 

run_importer_by_java 传的内存

默认 2048

--ytm

yarn task manager 内存

默认 2048

--yjm

yarn job manager 内存

默认 1024

--custom_processor_jar_path

自定义预处理程序 jar 所在路径


--custom_processor

自定义预处理程序类名

如:com.sensorsdata.analytics.csvtojson.CSVToJSON

--custom_processor_config

否 

自定义预处理程序的配置参数


integratoradmin importer list 输入输出

只会从库里查近 10000 条,之前的查不到

输入:

参数是否必传描述

--project


项目名,传了会查出来并赋值给 id

default

--project_id

项目 id

1

--job_name

否,都是过滤条件,不传会查很多出来




任务名称

--status

任务状态

--start_time

任务开始时间, '2022-01-01T01:00:00


--limit

显示的任务数量10

输出:

参数

描述

值的示例

id

任务id

11

session_id

session id

1

import_path

导入路径"hdfs:///sa/runtime/xwc_test/import_event_1694610302531"

log_path

日志路径$SENSORS_DATA_INTEGRATOR_LOG_DIR/importer/flink-644db427-5dd5-4f90-9457-ad5aa7942a9e-client-hybrid01.log
job_name任务名称custome_batch_load_01

start_time

任务开始时间

2023-09-13 21:26:42

end_time

任务结束时间2023-09-13 21:30:24
counter_info各种类型的数据的counter值   "event_import_count" : 200000,
    "all_read_count" : 200000,
    "all_import_count" : 200000,
    "all_skipped_count" : 0,
    "profile_read_count" : 0,
    "profile_import_count" : 0,
    "profile_skipped_count" : 0,
    "item_read_count" : 0,
    "item_import_count" : 0,
    "item_skipped_count" : 0,
    "detail_read_count" : 0,
    "detail_import_count" : 0,
    "detail_skipped_count" : 0
parameters调用任务时的参数输入

  "id" : -1,
    "path" : "hdfs:///sa/runtime/xwc_test/import_event_1694610302531",
    "session_id" : -1,
    "project" : "xwc",

    "job_name": "custome_batch_load_01",
    "project_id" : 3,
    "debug" : false,
    "parallelism" : 5,
    "mode" : "cluster",
    "enable_first_time_processor" : true,
    "expired_record_filter_after_hour" : 1,
    "expired_record_filter_before_hour" : 17520,
    "enable_id_mapping" : false,
    "log_file_name" : "$SENSORS_DATA_INTEGRATOR_LOG_DIR/importer/flink-644db427-5dd5-4f90-9457-ad5aa7942a9e-client-hybrid01.log",
    "re_group_user_data" : false

project_id

项目id3

batch_process_stage_id

批处理的id

importer_11_batch_process

batch_load_to_parquet_stage_id

转 parquet 的id

batch_load_to_sdw_stage_id

批导入的id

importer_11_batch_load_to_sdw

remapping_stage_id

remapping 的id

importer_11_remapping_send

supply_data_stage_id

补数据的id

importer_11_supply_data

batch_process_stage_status

批处理的状态success / fail / null

batch_load_to_parquet_stage_status

转 parquet 的状态success / fail / null

batch_load_to_sdw_stage_status

批导入状态success / fail / null

remapping_stage_status

remapping 状态success / fail / null

supply_data_stage_status

补数据状态success / fail / null

run_mode

运行模式

cluster

last_heartbeat_time

最后一次 heartbeat 时间

1694611822000

status

importer 的总状态success / fail / null