菜单

历史数据导入导出

本文档所描述的内容属于神策分析的高级使用功能,涉及较多技术细节,仅适用于历史数据导出导入,且环境数据总量不超过 10 亿条的客户参考。

如果您需要把数据迁移到其他新环境,或者环境数据总量超过 10 亿条,那么请联系您的客户成功经理或者项目经理购买数据迁移服务。如果对文档内容有疑惑,请咨询神策值班同学获取协助。

概述

神策分析开始为用户提供数据自助数据导入导出功能,即通过 API 导出原始神策分析环境中的历史数据,然后通过导入工具把历史数据导入到新建神策分析环境中。

数据导入导出准备

请检查您环境里事件属性、用户属性是否有神策保留字段,如果包含保留字段,请联系神策值班同学处理。否则会出现保留字段入库失败的情况。

为了保证查询时属性名不与系统变量名冲突,设置如下保留字段,请避免其作为事件名和属性名(properties 中的 key)使用:
date
datetime
distinct_id
event
events
event_id
first_id
id
original_id
device_id
properties
second_id
time
user_id
users
user_group 开头
user_tag 开头

数据导出

使用查询 API 通过编写 SQL 的方式,导出 users 表(含 Track Signup 数据)和 events 表中的数据(如果是 1.15.1646 及之后的神策系统版本,且使用了神策的 items 表功能,也需要导出 items 表中的数据),设置 format=profile_json/event_json ,这样导出的数据可以直接使用导入工具导入到另一项目。另外需要注意的是,SQL 在执行过程中,默认超过 10 分钟 之后会被系统强制杀死,如果希望增大超时时间可以在 SQL 语句之后添加 MAX_QUERY_EXECUTION_TIME 参数控制(时间单位为秒,建议最大超时时长不要超过 1800,以免影响系统正常查询)。

导出用户数据

使用 curl 导出 users 表的例子如下:

curl 'https://saasdemo.cloud.sensorsdata.cn/api/sql/query?token=******&project=default' \
-X POST \
--data-urlencode "q=SELECT * FROM users /*MAX_QUERY_EXECUTION_TIME=1800*/" \
--data-urlencode "format=profile_json" \
>> profile.json

导出的数据样例:

{"type":"track_signup","original_id":"anonymousId1","distinct_id":"registerId1","event":"$SignUp","time":1626343612099,"properties":{}}
{"type":"profile_set","distinct_id":"registerId1","properties":{"$update_time":1618000000000,"$is_deleted":false,"$is_login_id":true}}
{"type":"track_signup","original_id":"anonymousId2","distinct_id":"registerId2","event":"$SignUp","time":1626343612099,"properties":{}}
{"type":"profile_set","distinct_id":"registerId2","properties":{"$update_time":1599000000000,"$is_deleted":false,"$is_login_id":true}}

导出事件数据

在导出 events 表数据时,建议通过 date 字段按天导出,这样可以提高 SQL 的查询速度。

使用 curl 导出 events 表的例子如下:

curl 'https://saasdemo.cloud.sensorsdata.cn/api/sql/query?token=******&project=default' \
-X POST \
--data-urlencode "q=SELECT * FROM events where date = '2017-01-01' /*MAX_QUERY_EXECUTION_TIME=1800*/" \
--data-urlencode "format=event_json" \
>> event.json

导出的数据样例:

{"type":"track","event":"$AppStart","time_free":true,"time":1590734123713,"distinct_id":"F8C867C1-5AA8-4F89-A8ED-240BD83A76A2","properties":{"$lib":"iOS","$app_version":"1.0"}}
{"type":"track","event":"$AppClick","time_free":true,"time":1590733706742,"distinct_id":"F8C867C1-5AA8-4F89-A8ED-240BD83A76A2","properties":{"$lib":"iOS","$app_version":"1.0"}}

数据格式清洗

从 query/api 查询到的用户和事件数据必须经过数据格式清洗才能用作导入。由于导出数据的 identity 相关字段在 properties 中,如果直接用于导入,会丢失部分用户关联信息。(从老架构 id2 导出的数据可以跳过数据清洗)

# -*- coding: UTF-8 -*-
"""
@Description: 处理从 sa 的 query/sql 导出的数据
"""
 
import json
import argparse
 
BATCH_SIZE = 10000  # 可根据内存调整
 
def process_file(input_file, output_file, error_file):
    buffer = []
    error_buffer = []
    try:
        with open(input_file, 'r', encoding='utf-8') as infile, \
                open(output_file, 'w', encoding='utf-8') as outfile, \
                open(error_file, 'w', encoding='utf-8') as errfile:
 
            for line_number, line in enumerate(infile, start=1):
                try:
                    data = json.loads(line)
                    if 'properties' in data and need_process(data):
                        properties = data['properties']
                        identities = {}
 
                        # 移除 $idmap_reason
                        properties.pop('$idmap_reason', None)
 
                        # 提取 identity_ 字段
                        for key in list(properties.keys()):
                            if key.startswith('identity_') or key.startswith('$identity_'):
                                value = properties.pop(key)
                                if isinstance(value, list) and len(value) == 1:
                                    value = value[0]
                                identities[key] = value
 
                        if identities:
                            if len(identities) == 1 and '$identity_distinct_id' in identities:
                                distinct_id_value = identities['$identity_distinct_id']
                                if not isinstance(distinct_id_value, str):
                                    data['identities'] = identities
                            else:
                                data['identities'] = identities
 
                    buffer.append(json.dumps(data, ensure_ascii=False) + '\n')
 
                except Exception as e:
                    error_buffer.append(f"Line {line_number}: {str(e)}\n")
 
                # 批量写入
                if len(buffer) >= BATCH_SIZE:
                    outfile.writelines(buffer)
                    buffer.clear()
                if len(error_buffer) >= BATCH_SIZE:
                    errfile.writelines(error_buffer)
                    error_buffer.clear()
 
            # 写入剩余
            if buffer:
                outfile.writelines(buffer)
            if error_buffer:
                errfile.writelines(error_buffer)
    except FileNotFoundError as fnfe:
        print(f"Error: {str(fnfe)}")
 
def need_process(data):
    if 'identities' in data:
        return False
    if 'version' in data and data['version'] == 2:
        return False
    return True
 
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='处理从 sa 的 query/sql 导出的数据')
    parser.add_argument('--input_file', type=str, help='输入文件名')
    parser.add_argument('--output_file', type=str, default='processed_event_output.json', help='输出文件名,默认值 processed_output.json')
    parser.add_argument('--error_file', type=str, default='default_event_error_output.txt', help='错误文件名,默认值为 default_error_output.txt')
    args = parser.parse_args()
    process_file(args.input_file, args.output_file, args.error_file)

 

下载脚本到导出文件所在机器,并执行清洗

python3 clean_identity.py --input_file event.json --output_file event_ready.json --error_file event_error.log

参数说明:

  • --input_file: 用户导出 / 事件导出的文件路径
  • --output_file: 清洗后输出的结果文件
  • --error_file: 不能清洗的行,用于定位不能清洗的原因。为了保证数据完整性,需要解决失败问题,重新清洗。

清洗之后的数据文件,可以进行导入。

数据导入

导出的数据即是符合神策数据格式的数据,可以利用神策的导入工具直接导入,在导入数据的过程中,应遵循以下步骤,否则会导致 TrackSignup 逻辑混乱。

  • 先导入 users 表中的数据
  • 再导入 events 表中的数据

导入 users 表中的数据

请不要将文件内容顺序打乱。

导入 events 表中的数据

使用任意一种导入工具导入即可。

上一个
使用 JDBC 进行数据访问
下一个
订阅实时数据到 Kafka
最近修改: 2025-05-19