菜单

订阅实时数据到 Kafka

本文档所描述的内容属于神策分析的高级使用功能,涉及调用神策 OpenAPI,仅适用于有相关经验的用户参考。

概述

神策系统采用开放架构设计,支持用户通过订阅实时数据满足更多使用场景。服务端在接收到 SDK 发送的数据后,会进行处理并入库,同时支持将数据写入 Kafka 消息队列,供下游定制化计算模块使用。
本文将介绍如何将实时数据订阅到 Kafka 消息队列。

订阅要求

订阅实时数据需要满足以下要求:

  • 仅私有部署版支持通过 Kafka 实时订阅数据
  • 需准备独立的 Kafka 集群用于接收订阅数据(非神策分析 Kafka 集群,可以是自建 Kafka 或阿里云等云版 Kafka,暂不支持 kerberos 认证)
  • 神策分析部署机器必须能访问接收数据的 Kafka 集群, 可通过在神策分析部署机器上 telnet {kafka_broker_hostname} 9092 的方式验证连通性
  • 建议使用与神策服务端一致的 Kafka 版本(当前版本为 2.8.2)
  • 神策分析版本要求:分析云 3.0.1+ 套餐

准备工作

准备接收数据的 Kafka 及 Topic

用户需自备 Kafka 集群用于接收订阅数据,神策分析会将实时数据推送至此集群。需提前在 Kafka 集群中创建以下 Topic

数据类型 Topic名称示例 说明

用户属性数据

simple_attribute_data_topic

用于接收用户属性数据

用户事件数据

event_data_topic

用于接收用户事件数据

用户明细数据

detail_data_topic

用于接收用户明细数据

准备 OpenAPI 认证信息

订阅实时数据需调用神策 OpenAPI 接口,请参考 OpenAPI 认证方式 获取 API 密钥并构建请求头(api-key、sensorsdata-project 等)。

订阅相关 OpenAPI  接口与例子

注册订阅方

在正式订阅数据前,需要先在神策系统中注册订阅方。 订阅方信息包括:

  • application_name:订阅方业务唯一标识,由用户业务自己取名
  • 接收数据的 Kafka 信息:即 3.1 中用户自备 Kafka 集群的 broker 地址及 Topic

神策系统会基于 application_name 为订阅方生成一个唯一标识 application_id,并将 application_id 和 接收数据的 Kafka 进行绑定
后续调用订阅数据接口时只需要携带 application_id,此 application_id 下订阅的所有数据都会发送到绑定的 Kafka 

注意:用户一般只需要注册一个订阅方,如果下游计算业务需要的数据不同,可订阅全部所需数据到 Kafka 后,不同业务使用不同的 group.id 自行消费过滤

调用 OpenAPI 注册订阅方

  • 接口:http://{host}:8107/api/v3/horizon/v1/data-subscription/application/add
  • 请求方式:POST 请求
    • HEADER
      • api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
      • sensorsdata-project: production
  • 请求 BODY 示例

{
  "application": {
    "application_name": "{application_name}",
    "application_type": "KAFKA_BASED_APP",
    "kafka_based_app_config": {
      "data_format_type": "NESTED_JSON",
      "profile_kafka_descriptor": {
        "type": "CUSTOMIZED_KAFKA",
        "customized_kafka_descriptor": {
          "bootstrap_servers": "{broker_list}",
          "topic_name": "simple_attribute_data_topic"
        }
      },
      "event_kafka_descriptor": {
        "type": "CUSTOMIZED_KAFKA",
        "customized_kafka_descriptor": {
          "bootstrap_servers": "{broker_list}",
          "topic_name": "event_data_topic"
        }
      },
      "detail_kafka_descriptor": {
        "type": "CUSTOMIZED_KAFKA",
        "customized_kafka_descriptor": {
          "bootstrap_servers": "{broker_list}",
          "topic_name": "detail_data_topic"
        }
      }
    }
  }
}

注册订阅方的完整 curl 命令示例如下:

curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/application/add' \
-H 'api-key: {api-key}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'sensorsdata-project: {project_name}' \
-d '{
    "application":{
        "application_name": "{application_name}",
        "application_type": "KAFKA_BASED_APP",
        "kafka_based_app_config": {
            "data_format_type": "NESTED_JSON",             
            "profile_kafka_descriptor":{
                "type": "CUSTOMIZED_KAFKA",
                "customized_kafka_descriptor": {
                    "bootstrap_servers": "{broker_list}",
                    "topic_name": "simple_attribute_data_topic"
                }
            },
            "event_kafka_descriptor":{
                "type": "CUSTOMIZED_KAFKA",
                "customized_kafka_descriptor": {
                    "bootstrap_servers": "{broker_list}",
                    "topic_name": "event_data_topic"
                }
            },
            "detail_kafka_descriptor":{
                "type": "CUSTOMIZED_KAFKA",
                "customized_kafka_descriptor": {
                    "bootstrap_servers": "{broker_list}",
                    "topic_name": "detail_data_topic"
                }
            }
        }
    }
}'

curl 命令中需要传入的相关参数说明如下:

参数名称 参数值示例 说明
api-key #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx OpenAPI 接口认证信息,参考 OpenAPI 认证方式
application_id dataworks.company_a.app.sensorsdata.cloud 订阅方唯一标识
broker_list hostname1:9092,hostname2:9092,hostname3:9092 用户接收数据的 Kafka 对应 broker_list 里的值,集群有多个 hostname,用英文逗号分隔
data_format_type NESTED_JSON

订阅方期望接收到的数据格式。目前支持的数据格式有两种,默认数据格式为 NESTED_JSON

  • NESTED_JSON:接收到的数据是嵌套类型的 Json,每个字段的信息嵌套在一个结构体中,每个字段都有明确的 name、data_type 和 value。例如:

    .....
    {
        "name": "order_id",
        "data_type": "STRING",
        "value": "oid_1234565"
    }
    .....
  • FLATTEN_JSON:接收的数据格式是扁平化的 Json, 字段名直接作为键,值是具体的字段内容。例如:

    .....
    "order_id": "oid_1234565"
    .....
host 192.168.1.1 神策分析集群的域名 或神策分析集群 元数据节点的 IP
project_name production 项目英文名称 ,即 3.2.2 中查询到的项目名。注册订阅方不区分项目,选择任意一个存在的项目名即可
topic_name

simple_attribute_data_topic

event_data_topic

detail_data_topic

接收数据的 Kafka 中的 topic 名。即 3.1 中提前新建好的 topic。

  • 如需订阅用户属性数据,在请求 body 中携带 profile_kafka_descriptor,topic_name = 接收用户属性数据的 topic name
  • 如需订阅事件数据,在请求 body 中携带 event_kafka_descriptor,topic_name = 接收事件数据的 topic name
  • 如需订阅用户属性数据,在请求 body 中携带 detail_kafka_descriptor,topic_name = 接收明细数据的 topic name

调用神策 OpenAPI 注册订阅方,返回信息示例如下:

{
  "code": "SUCCESS",
  "request_id": "031750310c5963b984c39480cc9138da",
  "data": {
    "application": {
      "application_name": "{application_name}",
      "application_id": "{application_id}"
      "application_type": "KAFKA_BASED_APP",
      "kafka_based_app_config": {
        "data_format_type": "NESTED_JSON",         
        "profile_kafka_descriptor": {
          "type": "CUSTOMIZED_KAFKA",
          "customized_kafka_descriptor": {
            "bootstrap_servers": "{broker_list}",
            "topic_name": "simple_attribute_data_topic"
          }
        },
        "event_kafka_descriptor": {
          "type": "CUSTOMIZED_KAFKA",
          "customized_kafka_descriptor": {
            "bootstrap_servers": "{broker_list}",
            "topic_name": "event_data_topic"
          }
        },
        "detail_kafka_descriptor": {
          "type": "CUSTOMIZED_KAFKA",
          "customized_kafka_descriptor": {
            "bootstrap_servers": "{broker_list}",
            "topic_name": "detail_data_topic"
          }
        }
      }
    }
  }
}

需要关注返回信息中

  • code:标识调用神策 OpenAPI 是否成功,如果 code = SUCCESS 说明订阅成功
  • application_id:订阅方唯一标识,后续调用订阅数据接口时需要携带 application_id

查询注册订阅方是否成功

可通过调用神策 OpenAPI 查询订阅方是否已经注册到神策系统中

  • 接口:http://{host}:8107/api/v3/horizon/v1/data-subscription/application/get
  • 请求方式:POST 请求
    • HEADER
      • api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
      • sensorsdata-project: production
    • 请求 BODY 示例

    • {
        "application_name": "{application_name}"
      }

查询注册订阅方结果的完成 curl 例子如下:

curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/application/get \
-H 'api-key: {api-key}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'sensorsdata-project: {project_name}' \
-d '{"application_name": "{application_name}"}'

接口返回示例如下:

{
  "code": "SUCCESS",
  "request_id": "031750310c5963b984c39480cc9138da",
  "data": {
    "application": {
      "application_name": "{application_name}",
      "application_id": "{application_id}"
      "application_type": "KAFKA_BASED_APP",
      "kafka_based_app_config": {
        "profile_kafka_descriptor": {
          "type": "CUSTOMIZED_KAFKA",
          "customized_kafka_descriptor": {
            "bootstrap_servers": "{broker_list}",
            "topic_name": "simple_attribute_data_topic"
          }
        },
        "event_kafka_descriptor": {
          "type": "CUSTOMIZED_KAFKA",
          "customized_kafka_descriptor": {
            "bootstrap_servers": "{broker_list}",
            "topic_name": "event_data_topic"
          }
        },
        "detail_kafka_descriptor": {
          "type": "CUSTOMIZED_KAFKA",
          "customized_kafka_descriptor": {
            "bootstrap_servers": "{broker_list}",
            "topic_name": "detail_data_topic"
          }
        }
      }
    }
  }
}

追加订阅方配置

用户最初可能只需要订阅部分类型的数据,在注册订阅方时只绑定了部分 Kafka topic。后续如果想订阅其他类型的数据,可调用 OpenAPI 追加绑定接收其他类型数据的 Kafka

  • 接口:http://{host}:8107/api/v3/horizon/v1/data-subscription/application/config/append
  • 请求方式:POST 请求
    • HEADER
      • api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
      • sensorsdata-project: production
    • 请求 BODY 示例

    • {
          "application_name": "{application_name}",
          "kafka_based_app_config": {
              "profile_kafka_descriptor": {
                  "type": "CUSTOMIZED_KAFKA",
                  "customized_kafka_descriptor": {
                      "bootstrap_servers": "10.1.132.100:9092",
                      "topic_name": "simple_attribute_data_topic"
                  }
              }
          }
      }

完整的 curl 命令示例如下:

curl -L -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/application/config/append' \
-H 'api-key: {api-key}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'sensorsdata-project: {project_name}' \
-d '{
    "application_name": "{application_name}",
    "kafka_based_app_config": {
        "profile_kafka_descriptor": {
            "type": "CUSTOMIZED_KAFKA",
            "customized_kafka_descriptor": {
                "bootstrap_servers": "{broker_list}",
                "topic_name": "horizon_subscription_public_profile_stream"
            }
        }
    }
}'

订阅数据

通过 OpenAPI 向神策系统订阅需要的实时数据,订阅成功后,新上报的符合订阅范围的数据,会推送到用户接收数据的 Kafka 集群

  • 接口地址:http://{host}:8107/api/v3/horizon/v1/data-subscription/create
  • 请求方式:POST
    • HEADER
      • api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
      • sensorsdata-project: production

调用订阅 OpenAPI 需要传入的部分参数说明如下:

参数名称 参数值示例 说明
application_id dataworks.company_a.app.sensorsdata.cloud 订阅方唯一标识
subscriber dataworks_a 订阅方业务标识,同一个 application_id 下可以有多个 subscriber,用户可通过 subscriber 确认订阅记录属于哪个业务方

订阅用户属性数据

订阅指定用户属性

示例:订阅 namegenderage 三个用户属性

curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \
-d '{
    "subscription": {
        "application": "{application_id}",
        "subscriber": "{subscriber}",
        "target_type": "ENTITY_DATA",
        "entity_data_subscription_config": {
            "entity_name": "user",
            "attribute_type": "SIMPLE",
            "simple_attribute_config": {
                "attribute_names": [
                    "name",
                    "gender",
                    "age"
                ]
            }
        }
    }
}'

订阅所有用户属性

使用通配符 * 作为属性名可订阅所有普通用户属性:

  • 当请求时使用通配符 * 作为要订阅的属性名,表示订阅所有的普通用户属性,不包括虚拟属性
  • 对于新创建的属性,会自动补充订阅(属性创建完成到订阅生效之间存在最多 2 分钟延迟)

示例:订阅所有普通用户属性

curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \
-d '{
    "subscription": {
        "application": "{application_id}",
        "subscriber": "{subscriber}",
        "target_type": "ENTITY_DATA",
        "entity_data_subscription_config": {
            "entity_name": "user",
            "attribute_type": "SIMPLE",
            "simple_attribute_config": {
                "attribute_names": [
                    "*"
                ]
            }
        }
    }
}'

订阅事件数据

订阅指定事件下的部分属性

示例:订阅 payOrder 事件的 order_id 和 pay_time 属性

# 订阅 payOrder 事件的 order_id、order_time
curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \
-d '{
  "subscription": {
    "application": "{application_id}",
    "subscriber": "{subscriber}",
    "target_type": "ENTITY_DATA",
    "entity_data_subscription_config": {
      "entity_name": "user",
      "attribute_type": "EVENTS",
      "events_attribute_config": {
        "event_name": "payOrder",
        "attribute_names": [
          "order_id",
          "pay_time"
        ]
      }
    }
  }
}'

订阅指定事件下的所有属性

使用通配符 * 作为属性名可订阅事件的所有普通属性,不包括虚拟属性

示例:订阅 payOrder 事件的所有普通属性

curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \
-d '{
  "subscription": {
    "application": "{application_id}",
    "subscriber": "{subscriber}",
    "target_type": "ENTITY_DATA",
    "entity_data_subscription_config": {
      "entity_name": "user",
      "attribute_type": "EVENTS",
      "events_attribute_config": {
        "event_name": "payOrder",
        "attribute_names": [
          "*"
        ]
      }
    }
  }
}'

订阅所有事件下所有属性

使用通配符 * 作为事件名可订阅所有普通事件,不包括虚拟事件

示例:订阅所有普通事件的所有普通属性

curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \
-d '{
  "subscription": {
    "application": "{application_id}",
    "subscriber": "{subscriber}",
    "target_type": "ENTITY_DATA",
    "entity_data_subscription_config": {
      "entity_name": "user",
      "attribute_type": "EVENTS",
      "events_attribute_config": {
        "event_name": "*",
        "attribute_names": [
          "*"
        ]
      }
    }
  }
}'

订阅明细数据

订阅明细表的部分属性

示例:订阅明细表 cart 的 product_id 和 price 属性

curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \ 
--data '{
  "subscription": {
    "application": "{application_id}",
    "subscriber": "{subscriber}",
    "target_type": "ENTITY_DATA",
    "entity_data_subscription_config": {
      "entity_name": "user",
      "attribute_type": "DETAILS",
      "details_attribute_config": {
        "detail_name": "cart",
        "attribute_names": [
          "product_id",
          "price"
        ]
      }
    }
  }
}'

订阅明细表的所有属性

使用通配符 * 作为明细属性名可订阅明细表的所有普通属性,不包括虚拟属性

示例:订阅明细表 cart 的所有普通属性

curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \ 
--data '{
  "subscription": {
    "application": "{application_id}",
    "subscriber": "{subscriber}",
    "target_type": "ENTITY_DATA",
    "entity_data_subscription_config": {
      "entity_name": "user",
      "attribute_type": "DETAILS",
      "details_attribute_config": {
        "detail_name": "cart",
        "attribute_names": [
          "*"
        ]
      }
    }
  }
}'

订阅接口返回信息

调用神策订阅 OpenAPI 后,返回信息示例如下:

{
    "code": "SUCCESS",
    "request_id": "5d3d3273cb2d44fd9abceefd2b5cdad5",
    "data": {
        "subscription_id": 16001,
        "subscriber": "{subscriber}",
        "application": "{application_id}",
        "project_id": 2,
        "target_type": "ENTITY_DATA",
        "entity_data_subscription_config": {
            "entity_name": "user",
            "attribute_type": "EVENTS",
            "events_attribute_config": {
                "event_name": "payOrder",
                "attribute_names": [
                    "order_id",
                    "pay_time"
                ]
            }
        },
        "access_info": {
            "creator_id": "",
            "modifier_id": "",
            "create_time": "2023-05-11T10:53:32.553Z",
            "update_time": "2023-05-11T10:53:32.553Z"
        }
    }
}

需要关注返回信息中

  • code:标识请求 OpenAPI 是否成功,如果 code = SUCCESS 说明订阅成功
  • subscription_id:订阅记录 id,唯一标识一次成功的订阅

消费数据并进行处理

订阅数据成功后,对于新上报的数据中符合订阅范围的,会发送一份数据到接收数据的 Kafka 集群。

用户业务模块可以消费 Kafka 集群收到的数据进行定制化的处理。

消费数据

参考 Kafka 订阅样例 实现数据消费

数据格式

当注册订阅方时指定期望接收到的数据格式为 NESTED_JSON 时,Kafka 集群收到的数据示例如下:

  • 用户属性数据

  • {
        "trace_id": "117274217502270410197",
        "organization_id": "org-sep-xxx",
        "project_id": 2,
        "project": "production",  
        "record": {
            "update_time": 1727421750034,
            "entity_record": {
                "operation_type": "UPSERT",
                "entity_name": "user",
                "attribute_type": "SIMPLE",
                "entity_id": {
                    "name": "id",
                    "data_type": "BIGINT",
                    "value": "7356491433591617590"
                },
                "simple_data": {
                    "data": [{
                        "name": "age",
                        "data_type": "NUMBER",
                        "value": "25.0"
                    }, {
                        "name": "gender",
                        "data_type": "STRING",
                        "value": "男"
                    }, {
                        "name": "name",
                        "data_type": "STRING",
                        "value": "张三"
                    }]
                },
                "events_data": null,
                "details_data": null
            }
        }
    }
  • 事件数据
  • {
        "trace_id": "21727421748010075027",
        "organization_id": "org-sep-xxx",
        "project_id": 2,
        "project": "production",  
        "record": {
            "update_time": 1727421747856,
            "entity_record": {
                "operation_type": "UPSERT",
                "entity_name": "user",
                "attribute_type": "EVENTS",
                "entity_id": {
                    "name": "id",
                    "data_type": "BIGINT",
                    "value": "7356491433591617592"
                },
                "simple_data": null,
                "events_data": {
                    "event_name": "payOrder",
                    "data": [{
                        "name": "order_id",
                        "data_type": "STRING",
                        "value": "oid_1234565"
                    }, {
                        "name": "pay_time",
                        "data_type": "DATETIME",
                        "value": "1720540800000"
                    }],
                },
                "details_data": null
            }
        }
    }
  • 明细数据
  • {
        "trace_id": "51727421749008052549",
        "organization_id": "org-sep-12330",
        "project_id": 2,
        "project": "production",
        "record": {
            "update_time": 1727421747865,
            "entity_record": {
                "operation_type": "UPSERT",
                "entity_name": "user",
                "attribute_type": "DETAILS",
                "entity_id": {
                    "name": "user_id",
                    "data_type": "BIGINT",
                    "value": "7356491433591617590"
                },
                "simple_data": null,
                "events_data": null,
                "details_data": {
                    "detail_name": "cart",
                    "data": [{
                        "name": "price",
                        "data_type": "NUMBER",
                        "value": "111.11"
                    }, {
                        "name": "product_id",
                        "data_type": "STRING",
                        "value": "产品5"
                    },{
                        "name": "id",
                        "data_type": "STRING",
                        "value": "subscription_cart_cac278d47e_1"
                    }],
                    "primary_keys": [{
                        "name": "id",
                        "data_type": "STRING",
                        "value": "subscription_cart_cac278d47e_1"
                    }]
                }
            }
        }
    }

当注册订阅方时指定期望接收到的数据格式为 FLATTEN_JSON 时,Kafka 集群收到的数据示例如下:

{
  "trace_id": "21745403297474082219424",
  "organization_id": "org-sep-xxx",
  "project": "production",
  "project_id": 2,
  "entity_name": "user",
  "update_time": 1745403294934,
  "operation_type": "UPSERT",
  "entity_id": 8319627862933161993,
  "properties": {
    "price": 111.11,
    "order_id": "oid_1234565",
    "pay_time": "2023-11-16 09:22:13.000000"
  },
  "distinct_id": "device_3970185b-ff9e-413e-acb3-133c774740771745391152227.5833",
  "event": "OrderStatusChange",
  "time": 1745402074755
}

字段说明

订阅接收到的数据中,部分字段说明如下

参数名称 说明
trace_id 数据的唯一标识,可用于去重判断和问题排查
organization_id 神策系统组织 id,唯一标识一个私有化部署的神策分析集群,用户无需关注
project_id 项目 ID,通过该字段可以判断神策对应的项目,项目名和项目 ID 的关系可参考  项目管理  查询
entity_name 实体名,默认只有用户实体 (user)
attribute_type

数据类型,共 3 类

  • SIMPLE:普通用户属性
  • EVENTS:事件
  • DETAILS:明细
entity_id 实体 id,等于 events 表里的 user_id 或 users 表里的 id,参考:标识用户
simple_data

普通用户属性数据

events_data

事件属性数据,包含事件名称和一组属性

event_name 事件名
details_data

明细属性数据,包含明细名称、一组属性、主键值

detail_name 明细名称,即明细表名
primary_keys 明细表的主键属性名及其值,唯一标识一条明细表的数据
data 包含一组属性,每个属性有名称、数据类型和属性值
name 属性名,等于 events 表 或 users 表中一列的列名
value 属性值,统一使用字符串进行包装
data_type

属性数据类型,分为以下数据类型

  • BOOL:使用 Boolean.parseBoolean (value) 接收属性值
  • INT:整数,使用 Long.parseLong (value) 接收属性值
  • NUMBER:整数或浮点数,使用 Double.parseDouble (value) 接收属性值
  • STRING:字符串
  • LIST:列表,使用 Arrays.asList (value.split ("\n")) 接收属性值
  • DATETIME:13 位毫秒时间戳,使用 Long.parseLong (value) 接收属性值
  • DATE:日期字符串,格式为 yyyy-MM-dd 或 yyyy-MM-dd HH:mm:ss 等

  • BIGINT:长整型,使用 Long.parseLong (value) 接收属性值
  • DECIMAL:使用 new BigDecimal (value) 接收属性值

如何取消订阅

如果用户不想接收到某些已经订阅的数据,可以通过调用神策 OpenAPI 取消订阅

查询已订阅数据

用户可通过调用 OpenAPI 查询当前已订阅的数据,按需取消订阅

  • 接口地址:http://{host}:8107/api/v3/horizon/v1/data-subscription/list
  • 请求方式:GET
    • HEADER
      • api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
      • sensorsdata-project: production
    • PARAM
      • application: {application_id}
      • subscriber: {subscriber}

查询已订阅数据的完整 curl 命令如下:

curl -L -X GET 'http://{host}:8107/api/v3/horizon/v1/data-subscription/list?application={application_id}&subscriber={subscriber}' \
-H 'api-key: {api-key}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'sensorsdata-project: {project_name}'

接口返回信息示例如下:

{
  "code": "SUCCESS",
  "request_id": "031831579fc7617a600be3946389e432",
  "data": {
    "subscriptions": [
      {
        "project_id": 1,
        "subscription_id": 116,
        "application": "{application_id}",
        "subscriber": "{subscriber}",
        "target_type": "ENTITY_DATA",
        "entity_data_subscription_config": {
          "entity_name": "user",
          "attribute_type": "SIMPLE",
          "simple_attribute_config": {
            "attribute_names": [
              "phone_number",
              "quota",
              "register_time"
            ]
          }
        },
        "access_info": {
          "creator_id": "",
          "modifier_id": "",
          "create_time": "2025-04-02T09:06:10.510Z",
          "update_time": "2025-04-02T09:06:10.510Z"
        }
      }
    ]
  }
}

需要关注返回信息中的

  • subscriber:订阅方业务标识
  • subscription_id:订阅记录 id,唯一标识一次成功的订阅

取消订阅时需要传入 application_id + subscriber + subscription_id,能匹配到订阅记录时才能取消订阅

取消订阅

根据 查询已订阅数据 中查询到的已订阅的数据,调用 OpenAPI 取消订阅。调用完成后可再次查询已订阅的数据,确认取消订阅是否成功。

  • 接口地址:http://{host}:8107/api/v3/horizon/v1/data-subscription/batch-delete
  • 请求方式:POST
    • HEADER
      • api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
      • sensorsdata-project: production
  • 请求 BODY 示例如下

  • {
        "application": "{application_id}",
        "subscriber": "{subscriber}",
        "subscription_ids": [{subscription_id1}, {subscription_id2}]
    }

取消订阅的完整 curl 命令如下:

curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/batch-delete' \
-H 'api-key: {api-key}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'sensorsdata-project: {project_name}' \
-d '{
    "application":"{application_id}",
    "subscriber":"{subscriber}",
    "subscription_ids":[{subscription_id}]
}'

curl 命令中需要传入的部分参数说明如下:

参数名称
参数值示例
说明
application_id dataworks.company_a.app.sensorsdata.cloud 订阅方唯一标识
subscriber dataworks_a 订阅方业务标识,同一个 application_id 下可以有多个 subscriber,用户可通过 subscriber 确认订阅记录属于哪个业务方
subscription_ids [1001, 1002] 要取消订阅的订阅记录 id

删除订阅方

如果用户注册订阅方时部分信息填写错误,或者用户想更换接收订阅数据的 Kafka Topic,可调用 OpenAPI 删除已注册的订阅方并重新注册

需要注意在删除订阅方前,必须先将订阅方下已订阅的数据全部取消订阅,否则删除订阅方会失败

  • 接口地址:http://{host}:8107/api/v3/horizon/v1/data-subscription/application/delete
  • 请求方式:POST
    • HEADER
      • api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
      • sensorsdata-project: production
  • 请求 BODY 示例如下

  • {
        "application_name": "{application_name}"
    }

删除订阅方的完整 curl 命令如下:

curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/application/delete' \
-H 'api-key: {api-key}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'sensorsdata-project: {project_name}' \
-d '{
    "application_name":"{application_name}"
}'

常见问题

能否订阅历史数据

不能,订阅只对新上报的数据生效。如果用户需要历史数据,可以参考 使用 JDBC 进行数据访问 来获取历史数据。

订阅不到数据的可能原因

  • 网络原因:神策默认通过 hostname 来访问接收数据的 Kafka 发送数据,需要在神策服务器 hosts 文件中配置接收数据 Kafka 集群的 hostname 和 IP 映射关系。可通过在神策服务器上 telnet {kafka_broker_hostname} 9092 的方式来验证网络连通性

  • 订阅未成功:调用神策 OpenAPI 订阅数据,仅当接口返回 "code": "SUCCESS" 才表示调用成功
  • 订阅方注册不合理:每注册一个订阅方,神策系统需要对应启动一个订阅任务来发送数据,会消耗一部分神策服务器资源。对于单机版的神策系统,由于资源有限,神策默认限制只为注册的第一个订阅启动任务发送数据,其余订阅方无法收到数据
  • 数据未入库:只有入库的数据才会被订阅发送到 Kafka。例如用 debug 不入库模式上报数据,是不会在接收订阅数据的 Kafka 中收到数据的

免责声明

  • 由于用户 Kafka 集群不稳定导致的数据推送延迟,神策方不承担恢复责任
  • 由于用户 Kafka 集群不稳定导致的神策订阅数据堆积甚至数据过期,神策方不承担补数据责任
  • 其他由于用户环境或用户未按照文档操作导致的数据订阅相关问题,神策方可协助定位但不承担责任
上一个
历史数据导入导出
下一个
高级功能
最近修改: 2025-05-08