Subscribe to Kafka real-time data
|
Collect
The content described in this document belongs to the advanced use of SensReceipt Analysis and involves many technical details. It is suitable for experienced users to refer to the relevant functions.
Sensors Analytics is designed as an open architecture, allowing users to subscribe to real-time data to meet various usage scenarios. After the server receives data sent by the SDK, it preprocesses the data and writes it to the Kafka message queue for use by various downstream computing modules. This article will introduce the method of subscribing to Kafka data.
1.1. Subscription requirements
Subscribing to data has the following requirements:
Subscribing to data is only supported in the private deployment version;
The machine that initiates the subscription needs to be on the same intranet as the machine where SensReceipt Analysis is deployed and must be able to resolve the host of the SensReceipt Analysis server;
If subscribing to data on your own machine, you need to first map the hostname of the SensReceipt server to the subscription IP in the hosts file on your machine, and fill in the hostname of all machines when subscribing;
It is recommended to use a client version consistent with the server. The server version of most customers is greater than or equal to 2.0.0, so it is recommended to use a client version greater than or equal to 2.0.0. For specific situations, please log in to the SensReceipt server to check or consult the on-duty SensReceipt colleagues.
1.2. Obtain Kafka parameters
- Log in to any SensReceipt server
- Switch to the sa_cluster account
su - sa_cluster
- Use the following command to get the address
spadmin config get client -m kafka -p sp
For example, the output is:
{ "broker_list": [ "hostname1:9092", "hostname2:9092", "hostname3:9092" ], "channel_callback_partition_count": 3, "channel_callback_topic_name": "channel_topic", "extra_data_topic_name": "extra_data_topic", "item_partition_count": 3, "item_topic_name": "item_topic", "partitions_count": 10, "topic_name": "event_topic" }
If the command output contains the field "profile_topic_name": "profile_topic", it means that the SensReceipt environment has installed the SDF module. Whether the SensReceipt environment has installed the SDF module can also be obtained by consulting the on-duty SensReceipt colleagues. The following is an explanation of the Kafka-related parameters:
Parameter Name | Parameter Value | Description |
---|---|---|
topic | item_topic | Subscribe to data from the Sensing Items table |
event_topic | Subscribe to data from the Sensors events and users tables | |
partition | 3 on a single machine/10 in a cluster | corresponds to the value in partitions_count |
broker | hostname1:9092,hostname2:9092,hostname3:9092 | corresponds to the value in broker_list, multiple hostnames in a cluster are separated by commas |
parameter name | parameter value | Explanation |
---|---|---|
topic | item_topic | Subscribe to data from the Sensing Items table |
event_topic | Subscribe to the data of the events table in Sensing | |
profile_topic | Subscribe to the data of the users table in Sensors | |
partition | 3 for standalone / 10 for cluster | Corresponding to the value in partitions_count |
broker | hostname1:9092,hostname2:9092,hostname3:9092 | Corresponding to the value in broker_list, multiple hostnames in the cluster are separated by commas |
1.3. Subscription data
There are multiple ways to subscribe, you can choose the one that suits your scenario.
Here are two examples of starting subscriptions
1.3.1. Using Kafka Console Consumer
You can use Kafka's built-in Kafka Console Consumer to subscribe via the command line, for example, starting from the latest data:
bin/kafka-console-consumer.sh --bootstrap-server <bootstrap-server>:9092 --topic event_topic
The above command requires the installation of the Kafka client on your local machine, and then execute it under the Kafka path
1.3.2. Java Code Subscription Sample
Please refer to the sample implementation in the following GitHub link
Kafka Subscription Example GitHub Link
1.4. Data Format
The format of the subscribed data is essentially the same as the Data Format.
Example of Data Reported by Java SDK
{ "_track_id": -1302294273, "lib": { "$lib": "Java", "$lib_method": "code", "$lib_version": "3.1.15", "$lib_detail": "com.sensorsdata.analytics.javasdk.TestSA##track##TestSA.java##91" }, "distinct_id": "test0932", "type": "track", "event": "Order", "properties": { "$lib": "Java", "isLogin": false, "order_id": "real_1272", "$lib_version": "3.1.15" } }
Example of Subscribed Data
{ "_track_id": -1302294273, "lib": { "$lib": "Java", "$lib_method": "code", "$lib_version": "3.1.15", "$lib_detail": "com.sensorsdata.analytics.javasdk.TestSA##track##TestSA.java##91" }, "distinct_id": "test0932", "type": "track", "event": "Order", "properties": { "$lib": "Java", "isLogin": false, "order_id": "real_1272", "$lib_version": "3.1.15", "$ip": "10.90.28.102", "$is_login_id": false }, "time": 1600400230612, "project": "default", "token": "super", "extractor": { "f": null, "o": 0, "n": null, "s": 25, "c": 25, "e": "debugboxcreate1038.sa-DebugService" }, "recv_time": 1600400230612, "ngx_ip": "10.90.28.102", "process_time": 1600400230612, "map_id": "zjj-0932", "user_id": 81311457452485460, "project_id": 24, "ver": 2 }
Sensors Data adds some internal fields to the subscribed Kafka data, such as the progress information related to extractor and project_id. You don't need to pay attention to these fields. Just parse and get the fields you need. The explanations for some possible internal fields are as follows:
Parameter Name | Description |
---|---|
_track_id | A randomly generated value when the front-end SDK tracks, used for deduplication, and not written to the events table. |
project | English name of the project, used to determine the corresponding project in Sensors Data. |
token | Data import token, refer to:Frequently asked questions about data import Article 7 states |
recv_time | Equal to the value of $receive_time in the events table, when data is received |
user_id | Equal to the users_id in the events table and the id in the users table, see: Identifying Users |
Descriptions of the distinct_id, type, event, properties, and time fields, see: Data Format
1.5. Common Questions
1.5.1. Can historical data be subscribed to when consuming data from Kafka?
Yes, Sensors data's Kafka single node retains data for one year by default, and the cluster retains data for one week by default. If you subscribe through Kafka's Kafka Console Consumer using the command line, you can add --from-beginning to subscribe to historical data.
bin/kafka-console-consumer.sh --bootstrap-server hostname:9092 --topic event_topic --from-beginning
1.5.2. Possible reasons for not being able to subscribe to Kafka data
- Network issues: Sensordata subscribes to Kafka data using hostname. You need to configure the mapping relationship between Sensordata server's hostname and IP in the hosts file on the server that subscribes to Kafka. Confirm whether you can access the Sensordata server normally by telnet hostname 9092.
- Compatibility issues with Kafka versions: Higher version of the server is compatible with lower version of the client, so it's recommended that the version of the subscribing client not be greater than the Kafka version of the Sensordata server.
- If the data is reported in debug mode for testing and not saved in the database, you will not be able to subscribe to the data under the Kafka event_topic.
1.5.3. Can you specify the project to subscribe to Kafka data?
No, the data subscribed through the corresponding topic is for all projects. You need to differentiate different projects using the project parameter in JSON data.
1.5.4. Can Kafka data be subscribed by groups?
Yes, when subscribing to Kafka data through code or tools, you can customize the group.id parameter to differentiate different consumer groups. For specific definitions, see the Kafka official website.
1.5.5. Is it normal to subscribe to event data through the profile_topic?
Yes, it is normal. The profile_topic is used to subscribe to all data in the users table. If event data triggers user association and writes first_id, second_id in the users table, it will be subscribed to in the profile_topic. For user association logic, see: Identifying Users
Note: The content of this document is a technical document that provides details on how to use the Sensors product and does not include sales terms; the specific content of enterprise procurement products and technical services shall be subject to the commercial procurement contract.