当前位置 : 主页 > 编程语言 > python >

Python采用并发查询mysql以及调用API灌数据 (七)- 字典合并处理以及并发实现分

来源:互联网 收集:自由互联 发布时间:2022-07-13
前情回顾 上一篇文章已经编写了解决datetime类型需要序列化的问题,那么本章节我们来继续编写循环请求API灌入数据,以及并发实现的初步分析。 实战任务 本次因为服务架构重构,表

Python采用并发查询mysql以及调用API灌数据 (七)- 字典合并处理以及并发实现分析..._字段


前情回顾

上一篇文章已经编写了解决datetime类型需要序列化的问题,那么本章节我们来继续编写循环请求API灌入数据,以及并发实现的初步分析。

实战任务

本次因为服务架构重构,表优化、重构,带来的任务就是需要从原来的mysql数据库中,读取原表数据(部分存在多张关联查询)然后通过调用API的服务方式灌入新的数据库表中(包含mysql、mongodb)。

执行流程如下


Python采用并发查询mysql以及调用API灌数据 (七)- 字典合并处理以及并发实现分析..._字段_02


那么根据流程所需要的功能,需要以下的实例进行支撑:
1.并发实例
2.查询数据实例
3.执行post请求实例

目标:循环请求API灌入数据以及并发实现分析

循环请求API示例

在编写执行API请求之前,首先在查询过程有些特俗的字段需要加入api_body中,添加插入的数据,那么该如何处理呢?

合并dict字典数据

In [34]: dict1 = {'field1':'1'}

In [35]: dict2 = {'field2':'2'}

In [36]:

In [36]: dict3 = dict( dict1, **dict2 )

In [37]: print dict3
{'field2': '2', 'field1': '1'}

In [38]:

在model方法加入该功能,并循环请求API

在models.py增加方法:

# 根据查询的结果以及字段字典,转化为请求API的body,最后再合并一个特殊数据的字典
def convertMergeApiBody(self,result,dict_fields,special_fields):
# 循环生成每条查询数据的请求body
body = {}
for result in result:
for field in result:
if field == "null":
body[field] = None
else:
body[field] = result[field]
# 更新body的字段为新表的字段
new_body = {}
for key, value in dict_fields.items():
# print "key = %s , value = %s" % (key, value)
if key == "null":
new_body[value] = None
elif isinstance(body[key],datetime.datetime): # 将datetime类型转str,解决json的序列化问题
new_body[value] = body[key].strftime("%Y-%m-%d %H:%M:%S")
else:
new_body[value] = body[key]

# 合并特俗数据字典
api_body = dict( new_body , **special_fields)

return api_body

编写循环请求API方法:

# -*- coding: utf-8 -*-

from tools.MysqlTools import MysqldbHelper
import pymysql
from tools.PostTools import PostHelper
from core.models import ModelHelper
import datetime
import urllib2,json

if __name__ == "__main__":

# 定义数据库访问参数
config = {
'host': '######注释#########',
'port': 3361,
'user': 'root',
'passwd': '######注释#########',
'charset': 'utf8',
'cursorclass': pymysql.cursors.DictCursor
}
# 初始化数据模型
model = ModelHelper(config)
# 设置需要查询的数据库
DB_NAME = '######注释#########'
# 设置需要查询的表明
TABLE_NAME = '######注释#########'
# 设置字段映射字典: 旧表查询字段 ==》 新表的字段
# "id": "id",
dict_fields = {
######注释#########
}

# 特殊字段,用于合并api_body请求中
special_fields = {
######注释#########
}

# 获取旧表字段数组
select_fields = model.getSelectFields(dict_fields)
# 获取查询旧表数据数组
select_order = "order by id desc limit 2000"
select_result = model.selectTable(DB_NAME,TABLE_NAME,select_fields,select_order)
print "select_result="
print type(select_result)
print select_result
print
result_row = []
for row in select_result:
result_row.append(row) # 将查询结果加入list中
# print "result=",result_row[0]

# 生成API请求body
api_body = model.convertMergeApiBody(result_row, dict_fields ,special_fields)
print "api_body=", api_body, type(api_body)
# print
# 定义请求参数
url='http://######注释#########'
model.postInsertData(url=url,body=api_body)

result_row.pop(0) # 将查询结果剔除list中,保证传入api_body的参数只有一个字典的list

执行上面的代码,已经可以循环请求API请求灌入数据了。
但是插入的时间比较长。

可以使用一个time方法来计算一下耗时:

import time

start = time.clock()

#当中是你的程序

elapsed = (time.clock() - start)
print("Time used:",elapsed)

耗时如下:

('Time used:', 113.28491424571229)

使用了113秒,接近2分钟。这个效率是不能满足我们快速进行数据迁移的。那么下一步就是要考虑如何并发高效处理这些数据了。

那么下面来分析一下,哪个步骤耗时比较长。


Python采用并发查询mysql以及调用API灌数据 (七)- 字典合并处理以及并发实现分析..._sed_03


看看代码,可以知道这个循环是需要等待每次API请求后,返回结果再进行下一个循环执行的。

# 开始定时
start = time.clock()
result_row = []
for row in select_result:
result_row.append(row) # 将查询结果加入list中
# print "result=",result_row[0]

# 生成API请求body
api_body = model.convertMergeApiBody(result_row, dict_fields ,special_fields)
print "api_body=", api_body, type(api_body)
# print
# 定义请求参数
url='http://######注释############'
model.postInsertData(url=url,body=api_body)

result_row.pop(0) # 将查询结果剔除list中,保证传入api_body的参数只有一个字典的list

# 结束计时
elapsed = (time.clock() - start)
print("Time used:",elapsed)

那么将这个耗时较长请求API的工作进行异步并发,是否就可以解决问题了呢?

代码优化 - 构建生产者和消费者


Python采用并发查询mysql以及调用API灌数据 (七)- 字典合并处理以及并发实现分析..._数据_04


根据这个处理图,首先将代码优化生产者和消费者两部分方法,然后再进行调用。

# 定义生产者: 返回查询mysql数据
def produce(model,DB_NAME,TABLE_NAME,dict_fields,select_order):
# 获取旧表字段数组
select_fields = model.getSelectFields(dict_fields)
# 获取旧表查询数据
select_result = model.selectTable(DB_NAME, TABLE_NAME, select_fields, select_order)
return select_result

# 定义消费者:直接API请求
def consume(row,url,model):
result_row.append(row) # 将查询结果加入list中
# print "result=",result_row[0]
# 生成API请求body
api_body = model.convertMergeApiBody(result_row, dict_fields, special_fields)
model.postInsertData(url=url, body=api_body)
result_row.pop(0) # 将查询结果剔除list中,保证传入api_body的参数只有一个字典的list# 调用生成mysql查询数据
select_result = produce(model,DB_NAME,TABLE_NAME,dict_fields,select_order)

result_row = []
for row in select_result:
consume(row, url, model) # 消费请求API

好了,下面来看看怎么并发异步处理消费方法的这部分。

首先看一个并发异步的调用示例

参考:python 实现异步执行

#coding:utf-8
from threading import Thread
from time import sleep

def async(f):
def wrapper(*args, **kwargs):
thr = Thread(target = f, args = args, kwargs = kwargs)
thr.start()
return wrapper

@async
def A():
sleep(2)
print "a function"

def B():
print "b function"

if __name__=='__main__':
i = 0
for i in range(0,5):
A()
B()
print "i=",i

分析一下:
定义了一个装饰器 async 和 A 、B 两个function
A 里面sleep 2s , 然后打印 a function 字符串
B 里面直接打印 b function 字符串
我们循环调用两个功能,查看打印数据的时间:

b function
i= 0
b function
i= 1
b function
i= 2
b function
i= 3
b function
i= 4
a functiona function

a functiona function

a function

基本上 a function 的打印是同时出现的,所以sleep方法并没有阻塞其他A()方法的请求。
很好,这个就满足了我们消费者的这个场景。

将装饰器 async引入消费者,进行使用


Python采用并发查询mysql以及调用API灌数据 (七)- 字典合并处理以及并发实现分析..._字段_05


加了这个异步处理修饰器之后,CPU就快速飞涨。效率迅速提升,但是由于对端API处理效率不足,也会出现报错的情况。

对于这种报错的情况,其实只要给数据库增加一个is_import的字段,作为保证字段。每调用成功一个API,那么就修改一下这个is_import字段为1,那么下次只查询is_import为0的数据插入,这样就可以保证数据插入失败后能够再次查询插入了。

但是还要考虑一下,如果我循环调用这个异步,第一次查询2000左右的数据,第二次再查询2000的数据,这两份数据是否存在交集的情况,此时应该就要使用加锁来进行处理了。

下一篇章,继续讲解加锁的处理方法。


Python采用并发查询mysql以及调用API灌数据 (七)- 字典合并处理以及并发实现分析..._字段_06


关注微信公众号,回复【资料】、Python、PHP、JAVA、web,则可获得Python、PHP、JAVA、前端等视频资料。


网友评论