概述
HdfsImporter 与 BatchImporter 一样用于将 JSON 格式的历史数据或外部数据导入神策分析,HdfsImporter 导入的数据需要先放到神策分析集群上的 HDFS 上。
使用方法
运行环境
HdfsImporter 仅可在神策分析集群版使用,且只能运行在神策分析集群的 Hadoop 环境中。
使用步骤
一次新启动的导入步骤如下:
- 将数据置于神策分析集群任意一台机器上,例如 /home/sa_cluster 目录,其中包含文件 data01、data02、data03(命名可以随意)等文件(支持 txt、csv、log 等类型文件)。
- 文件内容为每行一个符合 数据格式 的 JSON,且需要保证 sa_cluster 账户对该目录文件有读写权限。
创建 HDFS 目录、调整权限、放置数据到神策分析集群 HDFS 上,比如 /data/input 目录,其他操作可以参考:hdfs 操作命令文档
hdfs dfs -put /home/sa_cluster/data01 /data/input
切换到 sa_cluster 账户:
sudo su - sa_cluster
- 请确定 HDFS 上数据不再进行修改(增删修改文件)后,运行 HdfsImporter。
先检查数据正确性,但不进行真的数据导入:开始 debug 模式,数据只会以 `JSON` 文本的方式输出到相应的 HDFS 目录下,并且不会真正的导入到神策系统当中。
hdfs_importer \ --path /data/input \ --project default \ --debug
检查无误后,进行真正的数据导入 :
hdfs_importer \ --path /data/input \ --project default
注意事项
- 导入后清理数据较复杂,请检查好再操作。对同一份数据多次运行导入会导致数据重复。
- path 可以是一个包含子目录的数据目录。
- HdfsImporter 执行成功之后,数据大约会在 1 分钟之后被查询到。
工具运行参数详解
列出所有运行的参数:
hdfs_importer --help
参数 必填 说明 默认值 path 是 需要导入的数据所在的 HDFS 路径。 project 否 期望导入的 project 名称,不指定的则导入到 default。 注:
导入数据中的 project 字段会失效,以此参数指定为准。default all_data_without_track_signup 否 是否所有数据(包括之前各种类型导入)都不包含 track_signup 类型数据。若所有导入数据均不包含 track_signup 类型数据,添加这个选项可以提高导入速度。注意,导入项目中使用过用户关联功能则不能使用该选项。开启 all_data_without_track_signup 之后会认为是不包含 track_signup 类型的数据,也就是说会忽略 is_login_id,同时也会忽略系统里已经存在的关联关系。 split_max_size_mb 否 MapReduce 每个 Mapper 处理分片的最大值。该值会影响 Mapper 数量。 512 job_queue_name 否 MapReduce Job 的队列名。 split_reduce_size_mb 否 MapReduce 每个 Reducer 处理的数据量大小。该值会影响 Reducer 的数据。Reducer 个数 = 输入的总数据量 / split_reduce_size_mb 。 2048 mapper_max_memory_size_mb 否 MapReduce 每个 Mapper 使用的内存最大值。 1024 reduce_max_memory_size_mb 否 MapReduce 每个 Reducer 使用的内存最大值。 2048 reduce_output_file_size_mb 否 MapReduce 每个 Reducer 输出的文件最大值。 256 shuffle_partition_split_size 否 Shuffer 时,每个 Mapper 切分 partition 的大小,一般用来调整数据分布不均匀 500000 expired_record_filter_after_hour 否 允许导入的数据时间区间截至未来的小时数,默认为 1,即超过未来 1 小时的数据将被过滤。 1 expired_record_filter_before_hour 否 允许导入的数据时间区间向前的小时数,默认为 17520,即超过 2 年以前的数据将被过滤。 17520 user_define_job_name 否 允许自定义 job name,比如在 HdfsImpoter-12345-[your_name] 中,your_name 是自定义的部分 无 debug 否 开始 debug 模式,数据只会以 JSON
文本的方式输出到相应的 HDFS 目录下,并且不会真正的导入到神策系统当中。disable_import_succeeded_source_path 否 禁止重复导入已成功过的导入路径 false write_import_info 否 在命令执行路径生成 import info 文件 false enable_first_time_processor 否 开启首日首次修正 false profile_storage_mode 否 profile 数据导入方式(kafka:profile 数据先发往 kafka,再由 kafka consumer 消费;kudu:profile 数据直接写入 kudu) kafka 一个复杂的例子:
hdfs_importer \ --path /data/input \ --project test_project \ --split_max_size_mb 128 \ --job_queue_name data_import_queue \ --split_reduce_size_mb 2048 \ --reduce_max_memory_size_mb 2048 \ --reduce_output_file_size_mb 256 \ --disable_import_succeeded_source_path
常见问题
查询导入历史
自神策 1.13 版本,支持查询 hdfs importer 导入历史, 查询结果以 JSON 格式输出。
使用方式 :
hdfs_importer list \ --project_name default
列出所有配置项 :
hdfs_importer list --help
参数 必填 说明 默认值 project_name 否 project name, 可指定按照 project name 查询导入任务记录 session_id 否 session id, 可指定按照 session id 查询导入任务记录 job_name 否 job name,可指定按照 job_name 查询导入任务记录。这里的 job_name 为导入任务中的 user_define_job_name status 否 hdfs importer 导入任务的状态,status 值可为 WAITING,RUNNING,SUCCESS,FAILED start_time 否 start time, 可查询startTime比传入的指定时间晚的导入记录, 格式%Y-%m-%d %H:%M:%S max_num 否 recent N times,可指定最近N次导入记录 10 dump 否 查询结果输出到指定的FILENAME;如果没有指定,则会输出到控制台 full 否 指定后则查看所有任务,否则同一个session_id只查看最新的任务 False 例如,查询按小时的例行导入任务:
hdfs_importer list \ --project_name test_project \ --job_name hourly_import \ --status SUCCESS \ --start_time '2018-08-30 00:00:00' \ --max_num 2 \ --dump /data/work/list_output_file1
输出到文件中的结果为:
[ { "id": 12, "session_id": 320, "job_name": "hourly_import", "scheduler_job_record_id": null, "import_path": "/sa/tmp/hdfsImport", "parameters": "{\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"}", "start_time": "2018-09-11 18:46:50", "end_time": "2018-09-11 18:49:46", "counter_info": "{\"event_read_count\":252600,\"event_import_count\":252600,\"event_skipped_count\":0,\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0}", "log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-43c7d4ea-0b14-48f6-8b03-764178e927ae.log", "event_job_id": "job_1536029076029_3074", "profile_job_id": "job_1536029076029_3077", "event_job_status": "SUCCESS", "profile_job_status": "SUCCESS", "event_data_load_status": "SUCCESS", "project_name": "test_project" }, { "id": 10, "session_id": 317, "job_name": "hourly_import", "scheduler_job_record_id": null, "import_path": "/sa/tmp/hdfsImport", "parameters": "{\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"}", "start_time": "2018-09-11 10:23:20", "end_time": "2018-09-11 10:26:21", "counter_info": "{\"event_read_count\":252600,\"event_import_count\":252600,\"event_skipped_count\":0,\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0}", "log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-67a00f94-67d8-415e-a004-c9ca82a17a2a.log", "event_job_id": "job_1536029076029_3044", "profile_job_id": null, "event_job_status": "SUCCESS", "profile_job_status": null, "event_data_load_status": "SUCCESS", "project_name": "test_project" } ]
参数 说明 样例 id hdfs_importer 表主键 8 session_id 会话 id 306 job_name 导入任务名 hourly_import scheduler_job_record_id 定时任务记录 id null import_path 导入的数据所在的 HDFS 路径 /sa/tmp/hdfsImport parameters 导入任务配置参数 {\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"} start_time 导入任务开始时间 2018-09-10 19:02:08 end_time 导入任务完成时间 2018-09-10 19:02:46 counter_info 导入数据条数 {\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0} log_path 导入任务运行日志 /data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-0634112a-6b90-40db-a26d-5492dbc7b995.log event_job_id 导入事件数据任务 id job_1536029076029_3082 或 null(没有导入事件数据) profile_job_id 导入用户属性任务 id job_1536029076029_3083 或 null(没有导入用户属性数据) item_job_id 导入物品属性任务 id job_1536029076029_3085 或 null(没有导入物品属性数据) event_job_status 导入事件数据任务状态 SUCCESS、FAILED 或 null(没有导入事件数据) profile_job_status 导入用户属性任务状态 SUCCESS、FAILED 或 null(没有导入用户属性数据) item_job_status 导入物品属性任务状态 SUCCESS、FAILED 或 null(没有导入物品属性数据) event_data_load_status 事件数据入库状态 SUCCESS、FAILED 或 null(没有导入事件数据) project_name 项目名 test_project 再如,查询指定 session_id 的所有导入任务状态:
hdfs_importer list \ --session_id 306 \ --full
输出到控制台的结果为
[ { "id": 8, "session_id": 306, "job_name": "hourly_import", "scheduler_job_record_id": null, "import_path": "/sa/tmp/hdfsImport", "parameters": "{\"session\":\"HdfsImport-306\"}", "start_time": "2018-09-10 19:02:08", "end_time": "2018-09-10 19:02:46", "counter_info": "{\"profile_read_count\":10104,\"profile_import_count\":10104,\"profile_skipped_count\":0}", "log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-0634112a-6b90-40db-a26d-5492dbc7b995.log", "event_job_id": null, "profile_job_id": "job_1536029076029_3084", "event_job_status": null, "profile_job_status": "SUCCESS", "event_data_load_status": null, "project_name": "test_project" }, { "id": 7, "session_id": 306, "job_name": "hourly_import", "scheduler_job_record_id": null, "import_path": "/sa/tmp/hdfsImport", "parameters": "{\"path\":\"/sa/tmp/hdfsImport\",\"project\":\"test_project\",\"user_define_job_name\":\"hourly_import\"}", "start_time": "2018-09-10 18:58:45", "end_time": "2018-09-10 19:01:10", "counter_info": "{\"event_read_count\":252600,\"event_import_count\":252600,\"event_skipped_count\":0}", "log_path": "/data/sa_cluster/logs/tools/hdfsimporter/hdfsimporter-c7514335-5a55-42b8-bfd3-0ad7a27ec1a3.log", "event_job_id": "job_1536029076029_3082", "profile_job_id": "job_1536029076029_3083", "event_job_status": "SUCCESS", "profile_job_status": "FAILED", "event_data_load_status": "SUCCESS", "project_name": "test_project" } ]
获取校验失败的数据
请参考如下命令去路径 /sa/runtime/HDFSImporter 或 /sa/runtime/HDFSImporterOutdated 找到 invalidLog 文件夹,其中 HdfsImporter-1 为 HdfsImporter session id,4 是将数据导入到哪个项目对应的 project id,可以通过多项目管理工具来获取。在 invalidLog 目录下有一系列 invalid.logs 开头的文件,即为错误数据保留的文件,然后再将此文件从神策服务器导出。
hdfs dfs -ls /sa/runtime/HDFSImporter/HdfsImporter-1/data/4/invalidLog
删除 hdfs_importer 导入的事件
若您的神策环境安装 SDF 组件,则可以参考 sdfadmin 数据清理工具使用说明来配置 hdfs_import_names 参数,实现按批次删除 hdfs_importer 导入的事件。若没有安装 SDF,则参考 sa_clean 数据清理工具说明文档进行事件数据的删除。
- 删除项目 httest 2020 年 11 月 1 号 hdfsImporter 导入批次为 HdfsImporter-1 的数据
sdfadmin event_delete -m execute -p httest -b 2020-11-01 -e 2020-11-01 --hdfs_import_names HdfsImporter-1
查看 events 表事件数据是哪个批次导入的
通过 events 表事件属性 $hdfs_import_batch_name 来查看事件是哪个批次的 hdfsImporter 导入的。