订阅 Kafka 实时数据
|
收藏
本文档所描述的内容属于神策分析的高级使用功能,涉及较多技术细节,适用于对相关功能有经验的用户参考。
神策分析设计为开放的架构,让用户可以订阅实时数据来满足更多使用场景。服务端接到一条 SDK 发来的数据后,会对数据做一些预处理并将数据写入到消息队列 Kafka 供下游各类计算模块使用。本文将介绍订阅 Kafka 数据的方法。
1.1. 订阅要求
订阅数据需要满足以下要求:
仅私有部署版支持通过 Kafka 订阅数据;
启动订阅的机器需与部署神策分析的机器在同一个内网,且必须可以解析神策分析服务器的 host;
如果在自己的机器上订阅数据,需要先在自己机器上 hosts 中配上神策的服务器 hostname 和订阅 IP 的映射关系,订阅的时候填写所有机器的 hostname;
建议使用与服务端保持一致的客户端版本,大部分客户的服务端版本大于等于 2.0.0,因此,建议使用的客户端版本也大于等于 2.0.0,具体情况可登录神策服务器查看或咨询神策值班同学。
1.2. 获取 Kafka 参数
- 登录任意的神策服务器
- 切换至 sa_cluster 账户
su - sa_cluster
- 使用以下命令获取地址
spadmin config get client -m kafka -p sp
aradmin config get client -m kafka -p sp
例如输出是:
{
"broker_list": [
"hostname1:9092",
"hostname2:9092",
"hostname3:9092"
],
"channel_callback_partition_count": 3,
"channel_callback_topic_name": "channel_topic",
"extra_data_topic_name": "extra_data_topic",
"item_partition_count": 3,
"item_topic_name": "item_topic",
"partitions_count": 10,
"topic_name": "event_topic"
}
如果命令输出中包含 "profile_topic_name": "profile_topic" 字段,则代表神策环境安装了 SDF 模块,神策环境是否安装 SDF 模块也可以咨询神策值班同学获取相关信息,Kafka 相关的参数说明如下
参数名称 | 参数值 | 说明 |
---|---|---|
topic | item_topic | 订阅神策 Items 表的数据 |
event_topic | 订阅神策 events 和 users 表的数据 | |
partition | 单机 3 个/集群 10 个 | 对应 partitions_count 里的值 |
broker | hostname1:9092,hostname2:9092,hostname3:9092 | 对应 broker_list 里的值,集群有多个 hostname,用英文逗号分隔 |
参数名称 | 参数值 | 说明 |
---|---|---|
topic | item_topic | 订阅神策 Items 表的数据 |
event_topic | 订阅神策 events 表的数据 | |
profile_topic | 订阅神策 users 表的数据 | |
partition | 单机 3 个/集群 10 个 | 对应 partitions_count 里的值 |
broker | hostname1:9092,hostname2:9092,hostname3:9092 | 对应 broker_list 里的值,集群有多个 hostname,用英文逗号分隔 |
1.3. 订阅数据
订阅有多种方式,可以选择一种适合使用场景的方式。
下面给出两种启动订阅的示例
1.3.1. 使用 Kafka Console Consumer
可以使用 Kafka 自带的 Kafka Console Consumer 通过命令行方式订阅,例如从最新数据开始订阅:
bin/kafka-console-consumer.sh --bootstrap-server <bootstrap-server>:9092 --topic event_topic
以上命令是需要您本地订阅的服务器安装 Kafka 客户端,然后进入 Kafka 路径下去执行
1.3.2. Java 代码订阅样例
请参考下面的 github 链接中的样例实现
1.4. 数据格式
订阅的数据的格式与数据格式基本一致。
Java SDK 上报的数据示例
{ "_track_id": -1302294273, "lib": { "$lib": "Java", "$lib_method": "code", "$lib_version": "3.1.15", "$lib_detail": "com.sensorsdata.analytics.javasdk.TestSA##track##TestSA.java##91" }, "distinct_id": "test0932", "type": "track", "event": "Order", "properties": { "$lib": "Java", "isLogin": false, "order_id": "real_1272", "$lib_version": "3.1.15" } }
订阅出来的数据示例
{ "_track_id": -1302294273, "lib": { "$lib": "Java", "$lib_method": "code", "$lib_version": "3.1.15", "$lib_detail": "com.sensorsdata.analytics.javasdk.TestSA##track##TestSA.java##91" }, "distinct_id": "test0932", "type": "track", "event": "Order", "properties": { "$lib": "Java", "isLogin": false, "order_id": "real_1272", "$lib_version": "3.1.15", "$ip": "10.90.28.102", "$is_login_id": false }, "time": 1600400230612, "project": "default", "token": "super", "extractor": { "f": null, "o": 0, "n": null, "s": 25, "c": 25, "e": "debugboxcreate1038.sa-DebugService" }, "recv_time": 1600400230612, "ngx_ip": "10.90.28.102", "process_time": 1600400230612, "map_id": "zjj-0932", "user_id": 81311457452485460, "project_id": 24, "ver": 2 }
神策 kafka 订阅出来的数据会加上部分神策内部字段,例如 extractor 相关的进度信息、project_id 项目 ID 等,您这边无需关注这些字段,按照您这边需要的字段去解析获取即可,一些可能需要的内部字段说明如下
参数名称 | 说明 |
---|---|
_track_id | 前端 SDK track 的时候上报的随机值,用于去重判断,并不会写入 events 表 |
project | 项目英文名,通过该字段判断神策对应的项目 |
token | 数据导入 token,参考:数据导入常见问题 中第 7 条说明 |
recv_time | 等于 events 表里的 $receive_time 的值,数据接收的时间 |
user_id | 等于 events 表里的 users_id 和 users 表里的 id,参考:标识用户 |
distinct_id、type、event、properties、time 字段的说明可以参考:数据格式
1.5. 常见问题
1.5.1. 从 Kafka 消费数据能否订阅历史数据
可以,神策的 kafka 单机默认保留一年的数据,集群默认保留一周的数据,如果是使用 Kafka 自带的 Kafka Console Consumer 通过命令行方式订阅,可以加上 --from-beginning 订阅历史数据
bin/kafka-console-consumer.sh --bootstrap-server hostname:9092 --topic event_topic --from-beginning
1.5.2. Kafka 订阅不到数据可能原因
- 网络问题:神策默认通过 hostname 来订阅 Kafka 数据,需要在订阅 Kafka 的服务器的 hosts 文件了配上神策服务器的 hostname 和 IP 的映射关系,确认下通过 telnet hostname 9092 是否能正常访问神策服务器
- Kafka 版本兼容问题:高版本服务端兼容低版本客户端,因此订阅的客户端版本建议不大于神策服务端的 Kafka 版
- 如果是测试用的 debug 不入库的模式上报数据,是不会在 Kafka event_topic 下面订阅到数据的
1.5.3. 能指定项目订阅 Kafka 数据吗
不能,通过对应 topic 订阅出来的数据是全部项目的,需要通过 JSON 数据里的 project 参数去区分不同的项目
1.5.4. 能按组订阅 Kafka 数据吗
可以,通过代码或者工具订阅 Kafka 数据的时候可以自定义 group.id 参数,用来区分不同的消费组,具体定义可以参考 Kafka 官网
1.5.5. 通过 profile_topic 订阅出事件数据正常吗
正常的,profile_topic 是订阅所有 users 表的数据,如果事件数据触发用户关联,去写 users 的 first_id,second_id ,是会在 profile_topic 里被订阅到,用户关联逻辑参考:标识用户
注:本文档内容为神策产品使用和技术细节说明文档,不包含适销类条款;具体企业采购产品和技术服务内容,以商业采购合同为准。