本文档所描述的内容属于神策分析的高级使用功能,涉及调用神策 OpenAPI,仅适用于有相关经验的用户参考。
概述
神策系统采用开放架构设计,支持用户通过订阅实时数据满足更多使用场景。服务端在接收到 SDK 发送的数据后,会进行处理并入库,同时支持将数据写入 Kafka 消息队列,供下游定制化计算模块使用。
本文将介绍如何将实时数据订阅到 Kafka 消息队列。
订阅要求
订阅实时数据需要满足以下要求:
- 仅私有部署版支持通过 Kafka 实时订阅数据
- 需准备独立的 Kafka 集群用于接收订阅数据(非神策分析 Kafka 集群,可以是自建 Kafka 或阿里云等云版 Kafka,暂不支持 kerberos 认证)
- 神策分析部署机器必须能访问接收数据的 Kafka 集群, 可通过在神策分析部署机器上 telnet {kafka_broker_hostname} 9092 的方式验证连通性
- 建议使用与神策服务端一致的 Kafka 版本(当前版本为 2.8.2)
- 神策分析版本要求:分析云 3.0.1+ 套餐
准备工作
准备接收数据的 Kafka 及 Topic
用户需自备 Kafka 集群用于接收订阅数据,神策分析会将实时数据推送至此集群。需提前在 Kafka 集群中创建以下 Topic
数据类型 | Topic名称示例 | 说明 |
用户属性数据 |
simple_attribute_data_topic |
用于接收用户属性数据 |
用户事件数据 |
event_data_topic |
用于接收用户事件数据 |
用户明细数据 |
detail_data_topic |
用于接收用户明细数据 |
准备 OpenAPI 认证信息
订阅实时数据需调用神策 OpenAPI 接口,请参考 OpenAPI 认证方式 获取 API 密钥并构建请求头(api-key、sensorsdata-project 等)。
订阅相关 OpenAPI 接口与例子
注册订阅方
在正式订阅数据前,需要先在神策系统中注册订阅方。 订阅方信息包括:
- application_name:订阅方业务唯一标识,由用户业务自己取名
- 接收数据的 Kafka 信息:即 3.1 中用户自备 Kafka 集群的 broker 地址及 Topic
神策系统会基于 application_name 为订阅方生成一个唯一标识 application_id,并将 application_id 和 接收数据的 Kafka 进行绑定
后续调用订阅数据接口时只需要携带 application_id,此 application_id 下订阅的所有数据都会发送到绑定的 Kafka
注意:用户一般只需要注册一个订阅方,如果下游计算业务需要的数据不同,可订阅全部所需数据到 Kafka 后,不同业务使用不同的 group.id 自行消费过滤
调用 OpenAPI 注册订阅方
- 接口:http://{host}:8107/api/v3/horizon/v1/data-subscription/application/add
- 请求方式:POST 请求
- HEADER
- api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
- sensorsdata-project: production
- HEADER
-
请求 BODY 示例:
{
"application": {
"application_name": "{application_name}",
"application_type": "KAFKA_BASED_APP",
"kafka_based_app_config": {
"data_format_type": "NESTED_JSON",
"profile_kafka_descriptor": {
"type": "CUSTOMIZED_KAFKA",
"customized_kafka_descriptor": {
"bootstrap_servers": "{broker_list}",
"topic_name": "simple_attribute_data_topic"
}
},
"event_kafka_descriptor": {
"type": "CUSTOMIZED_KAFKA",
"customized_kafka_descriptor": {
"bootstrap_servers": "{broker_list}",
"topic_name": "event_data_topic"
}
},
"detail_kafka_descriptor": {
"type": "CUSTOMIZED_KAFKA",
"customized_kafka_descriptor": {
"bootstrap_servers": "{broker_list}",
"topic_name": "detail_data_topic"
}
}
}
}
}
注册订阅方的完整 curl 命令示例如下:
curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/application/add' \
-H 'api-key: {api-key}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'sensorsdata-project: {project_name}' \
-d '{
"application":{
"application_name": "{application_name}",
"application_type": "KAFKA_BASED_APP",
"kafka_based_app_config": {
"data_format_type": "NESTED_JSON",
"profile_kafka_descriptor":{
"type": "CUSTOMIZED_KAFKA",
"customized_kafka_descriptor": {
"bootstrap_servers": "{broker_list}",
"topic_name": "simple_attribute_data_topic"
}
},
"event_kafka_descriptor":{
"type": "CUSTOMIZED_KAFKA",
"customized_kafka_descriptor": {
"bootstrap_servers": "{broker_list}",
"topic_name": "event_data_topic"
}
},
"detail_kafka_descriptor":{
"type": "CUSTOMIZED_KAFKA",
"customized_kafka_descriptor": {
"bootstrap_servers": "{broker_list}",
"topic_name": "detail_data_topic"
}
}
}
}
}'
curl 命令中需要传入的相关参数说明如下:
参数名称 | 参数值示例 | 说明 |
api-key | #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx | OpenAPI 接口认证信息,参考 OpenAPI 认证方式 |
application_id | dataworks.company_a.app.sensorsdata.cloud | 订阅方唯一标识 |
broker_list | hostname1:9092,hostname2:9092,hostname3:9092 | 用户接收数据的 Kafka 对应 broker_list 里的值,集群有多个 hostname,用英文逗号分隔 |
data_format_type | NESTED_JSON |
订阅方期望接收到的数据格式。目前支持的数据格式有两种,默认数据格式为 NESTED_JSON
|
host | 192.168.1.1 | 神策分析集群的域名 或神策分析集群 元数据节点的 IP |
project_name | production | 项目英文名称 ,即 3.2.2 中查询到的项目名。注册订阅方不区分项目,选择任意一个存在的项目名即可 |
topic_name |
simple_attribute_data_topic event_data_topic detail_data_topic |
接收数据的 Kafka 中的 topic 名。即 3.1 中提前新建好的 topic。
|
调用神策 OpenAPI 注册订阅方,返回信息示例如下:
{
"code": "SUCCESS",
"request_id": "031750310c5963b984c39480cc9138da",
"data": {
"application": {
"application_name": "{application_name}",
"application_id": "{application_id}"
"application_type": "KAFKA_BASED_APP",
"kafka_based_app_config": {
"data_format_type": "NESTED_JSON",
"profile_kafka_descriptor": {
"type": "CUSTOMIZED_KAFKA",
"customized_kafka_descriptor": {
"bootstrap_servers": "{broker_list}",
"topic_name": "simple_attribute_data_topic"
}
},
"event_kafka_descriptor": {
"type": "CUSTOMIZED_KAFKA",
"customized_kafka_descriptor": {
"bootstrap_servers": "{broker_list}",
"topic_name": "event_data_topic"
}
},
"detail_kafka_descriptor": {
"type": "CUSTOMIZED_KAFKA",
"customized_kafka_descriptor": {
"bootstrap_servers": "{broker_list}",
"topic_name": "detail_data_topic"
}
}
}
}
}
}
需要关注返回信息中
- code:标识调用神策 OpenAPI 是否成功,如果 code = SUCCESS 说明订阅成功
- application_id:订阅方唯一标识,后续调用订阅数据接口时需要携带 application_id
查询注册订阅方是否成功
可通过调用神策 OpenAPI 查询订阅方是否已经注册到神策系统中
- 接口:http://{host}:8107/api/v3/horizon/v1/data-subscription/application/get
- 请求方式:POST 请求
- HEADER
- api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
- sensorsdata-project: production
-
请求 BODY 示例:
-
{ "application_name": "{application_name}" }
- HEADER
查询注册订阅方结果的完成 curl 例子如下:
curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/application/get \
-H 'api-key: {api-key}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'sensorsdata-project: {project_name}' \
-d '{"application_name": "{application_name}"}'
接口返回示例如下:
{
"code": "SUCCESS",
"request_id": "031750310c5963b984c39480cc9138da",
"data": {
"application": {
"application_name": "{application_name}",
"application_id": "{application_id}"
"application_type": "KAFKA_BASED_APP",
"kafka_based_app_config": {
"profile_kafka_descriptor": {
"type": "CUSTOMIZED_KAFKA",
"customized_kafka_descriptor": {
"bootstrap_servers": "{broker_list}",
"topic_name": "simple_attribute_data_topic"
}
},
"event_kafka_descriptor": {
"type": "CUSTOMIZED_KAFKA",
"customized_kafka_descriptor": {
"bootstrap_servers": "{broker_list}",
"topic_name": "event_data_topic"
}
},
"detail_kafka_descriptor": {
"type": "CUSTOMIZED_KAFKA",
"customized_kafka_descriptor": {
"bootstrap_servers": "{broker_list}",
"topic_name": "detail_data_topic"
}
}
}
}
}
}
追加订阅方配置
用户最初可能只需要订阅部分类型的数据,在注册订阅方时只绑定了部分 Kafka topic。后续如果想订阅其他类型的数据,可调用 OpenAPI 追加绑定接收其他类型数据的 Kafka
- 接口:http://{host}:8107/api/v3/horizon/v1/data-subscription/application/config/append
- 请求方式:POST 请求
- HEADER
- api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
- sensorsdata-project: production
-
请求 BODY 示例:
-
{ "application_name": "{application_name}", "kafka_based_app_config": { "profile_kafka_descriptor": { "type": "CUSTOMIZED_KAFKA", "customized_kafka_descriptor": { "bootstrap_servers": "10.1.132.100:9092", "topic_name": "simple_attribute_data_topic" } } } }
- HEADER
完整的 curl 命令示例如下:
curl -L -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/application/config/append' \
-H 'api-key: {api-key}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'sensorsdata-project: {project_name}' \
-d '{
"application_name": "{application_name}",
"kafka_based_app_config": {
"profile_kafka_descriptor": {
"type": "CUSTOMIZED_KAFKA",
"customized_kafka_descriptor": {
"bootstrap_servers": "{broker_list}",
"topic_name": "horizon_subscription_public_profile_stream"
}
}
}
}'
订阅数据
通过 OpenAPI 向神策系统订阅需要的实时数据,订阅成功后,新上报的符合订阅范围的数据,会推送到用户接收数据的 Kafka 集群
- 接口地址:http://{host}:8107/api/v3/horizon/v1/data-subscription/create
- 请求方式:POST
- HEADER
- api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
- sensorsdata-project: production
- HEADER
调用订阅 OpenAPI 需要传入的部分参数说明如下:
参数名称 | 参数值示例 | 说明 |
application_id | dataworks.company_a.app.sensorsdata.cloud | 订阅方唯一标识 |
subscriber | dataworks_a | 订阅方业务标识,同一个 application_id 下可以有多个 subscriber,用户可通过 subscriber 确认订阅记录属于哪个业务方 |
订阅用户属性数据
订阅指定用户属性
示例:订阅 name
、gender
、age
三个用户属性
curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \
-d '{
"subscription": {
"application": "{application_id}",
"subscriber": "{subscriber}",
"target_type": "ENTITY_DATA",
"entity_data_subscription_config": {
"entity_name": "user",
"attribute_type": "SIMPLE",
"simple_attribute_config": {
"attribute_names": [
"name",
"gender",
"age"
]
}
}
}
}'
订阅所有用户属性
使用通配符 * 作为属性名可订阅所有普通用户属性:
- 当请求时使用通配符 * 作为要订阅的属性名,表示订阅所有的普通用户属性,不包括虚拟属性
- 对于新创建的属性,会自动补充订阅(属性创建完成到订阅生效之间存在最多 2 分钟延迟)
示例:订阅所有普通用户属性
curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \
-d '{
"subscription": {
"application": "{application_id}",
"subscriber": "{subscriber}",
"target_type": "ENTITY_DATA",
"entity_data_subscription_config": {
"entity_name": "user",
"attribute_type": "SIMPLE",
"simple_attribute_config": {
"attribute_names": [
"*"
]
}
}
}
}'
订阅事件数据
订阅指定事件下的部分属性
示例:订阅 payOrder
事件的 order_id
和 pay_time
属性
# 订阅 payOrder 事件的 order_id、order_time
curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \
-d '{
"subscription": {
"application": "{application_id}",
"subscriber": "{subscriber}",
"target_type": "ENTITY_DATA",
"entity_data_subscription_config": {
"entity_name": "user",
"attribute_type": "EVENTS",
"events_attribute_config": {
"event_name": "payOrder",
"attribute_names": [
"order_id",
"pay_time"
]
}
}
}
}'
订阅指定事件下的所有属性
使用通配符 * 作为属性名可订阅事件的所有普通属性,不包括虚拟属性
示例:订阅 payOrder 事件的所有普通属性
curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \
-d '{
"subscription": {
"application": "{application_id}",
"subscriber": "{subscriber}",
"target_type": "ENTITY_DATA",
"entity_data_subscription_config": {
"entity_name": "user",
"attribute_type": "EVENTS",
"events_attribute_config": {
"event_name": "payOrder",
"attribute_names": [
"*"
]
}
}
}
}'
订阅所有事件下所有属性
使用通配符 * 作为事件名可订阅所有普通事件,不包括虚拟事件
示例:订阅所有普通事件的所有普通属性
curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \
-d '{
"subscription": {
"application": "{application_id}",
"subscriber": "{subscriber}",
"target_type": "ENTITY_DATA",
"entity_data_subscription_config": {
"entity_name": "user",
"attribute_type": "EVENTS",
"events_attribute_config": {
"event_name": "*",
"attribute_names": [
"*"
]
}
}
}
}'
订阅明细数据
订阅明细表的部分属性
示例:订阅明细表 cart 的 product_id 和 price 属性
curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \
--data '{
"subscription": {
"application": "{application_id}",
"subscriber": "{subscriber}",
"target_type": "ENTITY_DATA",
"entity_data_subscription_config": {
"entity_name": "user",
"attribute_type": "DETAILS",
"details_attribute_config": {
"detail_name": "cart",
"attribute_names": [
"product_id",
"price"
]
}
}
}
}'
订阅明细表的所有属性
使用通配符 * 作为明细属性名可订阅明细表的所有普通属性,不包括虚拟属性
示例:订阅明细表 cart 的所有普通属性
curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \
--data '{
"subscription": {
"application": "{application_id}",
"subscriber": "{subscriber}",
"target_type": "ENTITY_DATA",
"entity_data_subscription_config": {
"entity_name": "user",
"attribute_type": "DETAILS",
"details_attribute_config": {
"detail_name": "cart",
"attribute_names": [
"*"
]
}
}
}
}'
订阅接口返回信息
调用神策订阅 OpenAPI 后,返回信息示例如下:
{
"code": "SUCCESS",
"request_id": "5d3d3273cb2d44fd9abceefd2b5cdad5",
"data": {
"subscription_id": 16001,
"subscriber": "{subscriber}",
"application": "{application_id}",
"project_id": 2,
"target_type": "ENTITY_DATA",
"entity_data_subscription_config": {
"entity_name": "user",
"attribute_type": "EVENTS",
"events_attribute_config": {
"event_name": "payOrder",
"attribute_names": [
"order_id",
"pay_time"
]
}
},
"access_info": {
"creator_id": "",
"modifier_id": "",
"create_time": "2023-05-11T10:53:32.553Z",
"update_time": "2023-05-11T10:53:32.553Z"
}
}
}
需要关注返回信息中
- code:标识请求 OpenAPI 是否成功,如果 code = SUCCESS 说明订阅成功
- subscription_id:订阅记录 id,唯一标识一次成功的订阅
消费数据并进行处理
订阅数据成功后,对于新上报的数据中符合订阅范围的,会发送一份数据到接收数据的 Kafka 集群。
用户业务模块可以消费 Kafka 集群收到的数据进行定制化的处理。
消费数据
参考 Kafka 订阅样例 实现数据消费
数据格式
当注册订阅方时指定期望接收到的数据格式为 NESTED_JSON 时,Kafka 集群收到的数据示例如下:
-
用户属性数据
-
{ "trace_id": "117274217502270410197", "organization_id": "org-sep-xxx", "project_id": 2, "project": "production", "record": { "update_time": 1727421750034, "entity_record": { "operation_type": "UPSERT", "entity_name": "user", "attribute_type": "SIMPLE", "entity_id": { "name": "id", "data_type": "BIGINT", "value": "7356491433591617590" }, "simple_data": { "data": [{ "name": "age", "data_type": "NUMBER", "value": "25.0" }, { "name": "gender", "data_type": "STRING", "value": "男" }, { "name": "name", "data_type": "STRING", "value": "张三" }] }, "events_data": null, "details_data": null } } }
- 事件数据
-
{ "trace_id": "21727421748010075027", "organization_id": "org-sep-xxx", "project_id": 2, "project": "production", "record": { "update_time": 1727421747856, "entity_record": { "operation_type": "UPSERT", "entity_name": "user", "attribute_type": "EVENTS", "entity_id": { "name": "id", "data_type": "BIGINT", "value": "7356491433591617592" }, "simple_data": null, "events_data": { "event_name": "payOrder", "data": [{ "name": "order_id", "data_type": "STRING", "value": "oid_1234565" }, { "name": "pay_time", "data_type": "DATETIME", "value": "1720540800000" }], }, "details_data": null } } }
- 明细数据
-
{ "trace_id": "51727421749008052549", "organization_id": "org-sep-12330", "project_id": 2, "project": "production", "record": { "update_time": 1727421747865, "entity_record": { "operation_type": "UPSERT", "entity_name": "user", "attribute_type": "DETAILS", "entity_id": { "name": "user_id", "data_type": "BIGINT", "value": "7356491433591617590" }, "simple_data": null, "events_data": null, "details_data": { "detail_name": "cart", "data": [{ "name": "price", "data_type": "NUMBER", "value": "111.11" }, { "name": "product_id", "data_type": "STRING", "value": "产品5" },{ "name": "id", "data_type": "STRING", "value": "subscription_cart_cac278d47e_1" }], "primary_keys": [{ "name": "id", "data_type": "STRING", "value": "subscription_cart_cac278d47e_1" }] } } } }
当注册订阅方时指定期望接收到的数据格式为 FLATTEN_JSON 时,Kafka 集群收到的数据示例如下:
{
"trace_id": "21745403297474082219424",
"organization_id": "org-sep-xxx",
"project": "production",
"project_id": 2,
"entity_name": "user",
"update_time": 1745403294934,
"operation_type": "UPSERT",
"entity_id": 8319627862933161993,
"properties": {
"price": 111.11,
"order_id": "oid_1234565",
"pay_time": "2023-11-16 09:22:13.000000"
},
"distinct_id": "device_3970185b-ff9e-413e-acb3-133c774740771745391152227.5833",
"event": "OrderStatusChange",
"time": 1745402074755
}
字段说明
订阅接收到的数据中,部分字段说明如下
参数名称 | 说明 |
trace_id | 数据的唯一标识,可用于去重判断和问题排查 |
organization_id | 神策系统组织 id,唯一标识一个私有化部署的神策分析集群,用户无需关注 |
project_id | 项目 ID,通过该字段可以判断神策对应的项目,项目名和项目 ID 的关系可参考 项目管理 查询 |
entity_name | 实体名,默认只有用户实体 (user) |
attribute_type |
数据类型,共 3 类
|
entity_id | 实体 id,等于 events 表里的 user_id 或 users 表里的 id,参考:标识用户 |
simple_data |
普通用户属性数据 |
events_data |
事件属性数据,包含事件名称和一组属性 |
event_name | 事件名 |
details_data |
明细属性数据,包含明细名称、一组属性、主键值 |
detail_name | 明细名称,即明细表名 |
primary_keys | 明细表的主键属性名及其值,唯一标识一条明细表的数据 |
data | 包含一组属性,每个属性有名称、数据类型和属性值 |
name | 属性名,等于 events 表 或 users 表中一列的列名 |
value | 属性值,统一使用字符串进行包装 |
data_type |
属性数据类型,分为以下数据类型
|
如何取消订阅
如果用户不想接收到某些已经订阅的数据,可以通过调用神策 OpenAPI 取消订阅
查询已订阅数据
用户可通过调用 OpenAPI 查询当前已订阅的数据,按需取消订阅
- 接口地址:http://{host}:8107/api/v3/horizon/v1/data-subscription/list
- 请求方式:GET
- HEADER
- api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
- sensorsdata-project: production
- PARAM
- application: {application_id}
- subscriber: {subscriber}
- HEADER
查询已订阅数据的完整 curl 命令如下:
curl -L -X GET 'http://{host}:8107/api/v3/horizon/v1/data-subscription/list?application={application_id}&subscriber={subscriber}' \
-H 'api-key: {api-key}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'sensorsdata-project: {project_name}'
接口返回信息示例如下:
{
"code": "SUCCESS",
"request_id": "031831579fc7617a600be3946389e432",
"data": {
"subscriptions": [
{
"project_id": 1,
"subscription_id": 116,
"application": "{application_id}",
"subscriber": "{subscriber}",
"target_type": "ENTITY_DATA",
"entity_data_subscription_config": {
"entity_name": "user",
"attribute_type": "SIMPLE",
"simple_attribute_config": {
"attribute_names": [
"phone_number",
"quota",
"register_time"
]
}
},
"access_info": {
"creator_id": "",
"modifier_id": "",
"create_time": "2025-04-02T09:06:10.510Z",
"update_time": "2025-04-02T09:06:10.510Z"
}
}
]
}
}
需要关注返回信息中的
- subscriber:订阅方业务标识
- subscription_id:订阅记录 id,唯一标识一次成功的订阅
取消订阅时需要传入 application_id + subscriber + subscription_id,能匹配到订阅记录时才能取消订阅
取消订阅
根据 查询已订阅数据 中查询到的已订阅的数据,调用 OpenAPI 取消订阅。调用完成后可再次查询已订阅的数据,确认取消订阅是否成功。
- 接口地址:http://{host}:8107/api/v3/horizon/v1/data-subscription/batch-delete
- 请求方式:POST
- HEADER
- api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
- sensorsdata-project: production
- HEADER
-
请求 BODY 示例如下:
-
{ "application": "{application_id}", "subscriber": "{subscriber}", "subscription_ids": [{subscription_id1}, {subscription_id2}] }
取消订阅的完整 curl 命令如下:
curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/batch-delete' \
-H 'api-key: {api-key}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'sensorsdata-project: {project_name}' \
-d '{
"application":"{application_id}",
"subscriber":"{subscriber}",
"subscription_ids":[{subscription_id}]
}'
curl 命令中需要传入的部分参数说明如下:
参数名称
|
参数值示例
|
说明
|
---|---|---|
application_id | dataworks.company_a.app.sensorsdata.cloud | 订阅方唯一标识 |
subscriber | dataworks_a | 订阅方业务标识,同一个 application_id 下可以有多个 subscriber,用户可通过 subscriber 确认订阅记录属于哪个业务方 |
subscription_ids | [1001, 1002] | 要取消订阅的订阅记录 id |
删除订阅方
如果用户注册订阅方时部分信息填写错误,或者用户想更换接收订阅数据的 Kafka Topic,可调用 OpenAPI 删除已注册的订阅方并重新注册
需要注意在删除订阅方前,必须先将订阅方下已订阅的数据全部取消订阅,否则删除订阅方会失败
- 接口地址:http://{host}:8107/api/v3/horizon/v1/data-subscription/application/delete
- 请求方式:POST
- HEADER
- api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
- sensorsdata-project: production
- HEADER
-
请求 BODY 示例如下:
-
{ "application_name": "{application_name}" }
删除订阅方的完整 curl 命令如下:
curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/application/delete' \
-H 'api-key: {api-key}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'sensorsdata-project: {project_name}' \
-d '{
"application_name":"{application_name}"
}'
常见问题
能否订阅历史数据
不能,订阅只对新上报的数据生效。如果用户需要历史数据,可以参考 使用 JDBC 进行数据访问 来获取历史数据。
订阅不到数据的可能原因
-
网络原因:神策默认通过 hostname 来访问接收数据的 Kafka 发送数据,需要在神策服务器 hosts 文件中配置接收数据 Kafka 集群的 hostname 和 IP 映射关系。可通过在神策服务器上
telnet {kafka_broker_hostname} 9092
的方式来验证网络连通性 - 订阅未成功:调用神策 OpenAPI 订阅数据,仅当接口返回 "code": "SUCCESS" 才表示调用成功
- 订阅方注册不合理:每注册一个订阅方,神策系统需要对应启动一个订阅任务来发送数据,会消耗一部分神策服务器资源。对于单机版的神策系统,由于资源有限,神策默认限制只为注册的第一个订阅启动任务发送数据,其余订阅方无法收到数据
-
数据未入库:只有入库的数据才会被订阅发送到 Kafka。例如用 debug 不入库模式上报数据,是不会在接收订阅数据的 Kafka 中收到数据的
免责声明
- 由于用户 Kafka 集群不稳定导致的数据推送延迟,神策方不承担恢复责任
- 由于用户 Kafka 集群不稳定导致的神策订阅数据堆积甚至数据过期,神策方不承担补数据责任
- 其他由于用户环境或用户未按照文档操作导致的数据订阅相关问题,神策方可协助定位但不承担责任