一 背景说明 1 开发要求mongodb里的数据需要导入到clickhouse,方便他们分析,因此才有了如下的操作,刚开始找了很多第三方的数据迁移软件,比如tapdata有这个功能,不过用过几次,经常
一 背景说明
1 开发要求mongodb里的数据需要导入到clickhouse,方便他们分析,因此才有了如下的操作, 刚开始找了很多第三方的数据迁移软件,比如tapdata有这个功能,不过用过几次,经常报错,并且也是收费的。因此才决定自己写python脚本解决这个问题。
2 数据能否顺利导入,发现跟创建ck库里面表的字段类型有着密切的关系
二 在ck里需要提前创建好指定的表
下面是我在ck里面创建了一个core_customer_test 这个表,像create_time,update_time,应该都是Datetime的类型,我这里选择了string,不然我的导入脚本就会报错
CREATE TABLE mongodb.core_customer_test ( `create_time` Nullable(String), `dep_id` Nullable(String), `is_delete` Nullable(String), `invitor_userid` Nullable(String), `name` Nullable(String), `qx_name` Nullable(String), `unionid` Nullable(String), `video_add_time` Nullable(String), `lunar_birthday` Nullable(String), `external_userid` Nullable(String), `follow_user` Nullable(String), `_id` Varchar, `birthday` Nullable(String), `update_time` Nullable(String), `sex` Nullable(String), ) ENGINE = MergeTree ORDER BY (_id);三 数据同步脚本
3.1 全量同步脚本
import pymongo import pymysql import sys from clickhouse_driver import Client #import datetime from datetime import datetime from datetime import datetime, timedelta #dd = collection.find().limit(2) mydb = Client( host="clickhouse_ip.clickhouse.ads.aliyuncs.com", user="root_db", password="xxxxxx", database="mongodb" ) #clickhouse #mycursor = mydb.cursor() client = pymongo.MongoClient('mongodb://root:xxxxxxx..@172.19.144.111:27018/') db = client['sjzx'] collection = db['core_customer'] skip_num=0 count=collection.count_documents({}) print(count) read_num=0 clickhouse_data = [] while True: clickhouse_data = [] #print(skip_num) if read_num * 100 >= count: break dd = collection.find().limit(200).skip(skip_num) for row in dd: print(clickhouse_data) clickhouse_data = [] converted_row = [ str(row["_id"]), # 转换为字符串类型 str(row.get("birthday",'')), # 转换为整数类型 #row['create_time'].replace(microsecond=0), str(row.get("create_time",'')), str(row.get("dep_id",'')), # 转换为浮点数类型 #row.get['dep_id',''], str(row.get("is_delete",'')), # 转换为浮点数类型 str(row.get("name",'')), # 转换为浮点数类型 str(row.get("qx_name",'')), # 转换为浮点数类型 str(row.get("sex",'')), # 转换为浮点数类型 #row['update_time'].replace(microsecond=0), str(row.get("update_time",'')), # 转换为浮点数类型 str(row.get("video_add_time",'')), # 转换为浮点数类型 str(row.get("lunar_birthday",'')), # 转换为浮点数类型 str(row.get("invitor_userid",'')), # 转换为浮点数类型 str(row.get("external_userid",'')), # 转换为浮点数类型 str(row.get("follow_user",'')), # 转换为浮点数类型 ] clickhouse_data.append(converted_row) insert_query = ('INSERT INTO core_customer_test (_id, birthday, create_time,dep_id,is_delete,name,qx_name,sex,update_time,video_add_time,lunar_birthday, invitor_userid, external_userid, follow_user) VALUES') mydb.execute(insert_query, clickhouse_data,types_check=True) skip_num = skip_num + 200 read_num = read_num + 1 mydb.disconnect()3.2 增量同步脚本
增量同步是暂定第二天的1点钟,同步前一天的所有数据,在定时任务计划里固定时间运行 比如:2023-4-13 的1点钟,会同步 2023-4-12的一天的数据,以此类推。
import pymongo import pymysql import sys from clickhouse_driver import Client #import datetime from datetime import datetime from datetime import datetime, timedelta #dd = collection.find().limit(2) mydb = Client( host="clickhouse_ip.clickhouse.ads.aliyuncs.com", user="root_db", password="xxxxxxx", database="mongodb" ) #clickhouse #mycursor = mydb.cursor() client = pymongo.MongoClient('mongodb://root:xxxxx..@172.19.144.111:27018/') db = client['sjzx'] collection = db['core_customer'] skip_num=0 yes_date = datetime.now() - timedelta(days=1) yes_date_start = yes_date.strptime(yes_date.strftime('%Y-%m-%d') + ' 00:00:00', '%Y-%m-%d %H:%M:%S') yes_date_end = yes_date.strptime(yes_date.strftime('%Y-%m-%d') + ' 23:59:59', '%Y-%m-%d %H:%M:%S') myquery = {'create_time': {'$gte': yes_date_start, '$lte': yes_date_end}} count=collection.count_documents({}) print(count) read_num=0 clickhouse_data = [] while True: clickhouse_data = [] #print(skip_num) if read_num * 100 >= count: break dd = collection.find(myquery).limit(200).skip(skip_num) for row in dd: print(clickhouse_data) clickhouse_data = [] converted_row = [ str(row["_id"]), # 转换为字符串类型 str(row.get("birthday",'')), # 转换为整数类型 str(row.get("create_time",'')), # 转换为整数类型 str(row.get("dep_id",'')), # 转换为浮点数类型 str(row.get("is_delete",'')), # 转换为浮点数类型 str(row.get("name",'')), # 转换为浮点数类型 str(row.get("qx_name",'')), # 转换为浮点数类型 str(row.get("sex",'')), # 转换为浮点数类型 str(row.get("update_time",'')), # 转换为浮点数类型 str(row.get("video_add_time",'')), # 转换为浮点数类型 #row['video_add_time'].replace(microsecond=0), str(row.get("lunar_birthday",'')), # 转换为浮点数类型 str(row.get("invitor_userid",'')), # 转换为浮点数类型 str(row.get("external_userid",'')), # 转换为浮点数类型 str(row.get("follow_user",'')), # 转换为浮点数类型 ] clickhouse_data.append(converted_row) insert_query = ('INSERT INTO core_customer_test (_id, birthday, create_time,dep_id,is_delete,name,qx_name,sex,update_time,video_add_time,lunar_birthday, invitor_userid, external_userid, follow_user) VALUES') mydb.execute(insert_query, clickhouse_data,types_check=True) skip_num = skip_num + 200 read_num = read_num + 1 mydb.disconnect() #mydb.close()四 到clickkhosue查看数据
最后,要感谢郑照辉的帮助。