Python对接支付宝支付自实现 # -*- coding: utf-8 -*-import base64import jsonimport urllib.parsefrom datetime import datetimeimport requestsfrom cryptography.hazmat.backends import default_backendfrom cryptography.hazmat.primitives i
Python对接支付宝支付自实现
# -*- coding: utf-8 -*- import base64 import json import urllib.parse from datetime import datetime import requests from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import padding class AliPayException(Exception): def __init__(self, data): super(AliPayException, self).__init__() self.data = data def __str__(self): return "alipay - {}".format(self.data) def __unicode__(self): return u"alipay - {}".format(self.data) class AliPayVerifyException(AliPayException): def __init__(self, msg, data): super(AliPayVerifyException, self).__init__('alipay verify except - {}:{}'.format(msg, data)) class AliPay: def __init__(self, **kwargs): """ :param kwargs: url: 请求地址 notify_url: 支付宝服务器主动通知商户服务器里指定的页面http/https路径 app_id: 支付宝分配给开发者的应用ID sign_type: 商户生成签名字符串所使用的签名算法类型,目前支持RSA2和RSA,推荐使用RSA2 app_private_key: 签名私钥 """ self._app_id = kwargs['app_id'] self._seller_id = kwargs['seller_id'] self._gateway_url = kwargs['gateway_url'] self._notify_url = kwargs.get('notify_url') self._sign_type = kwargs.get('sign_type', 'RSA2') if self._sign_type not in ('RSA', 'RSA2'): raise Exception('alipay sign_type must `RSA` or `RSA2`') self._charset = 'utf-8' self._format = 'json' with open(kwargs['app_private_key']) as f: self._app_private_key = serialization.load_pem_private_key( f.read().encode('utf8'), None, default_backend() ) with open(kwargs['public_key']) as f: self._public_key = serialization.load_pem_public_key( f.read().encode('utf8'), default_backend() ) @property def app_id(self): return self._app_id @property def seller_id(self): return self._seller_id def app_private_sign(self, data): if self._sign_type == 'RSA': signature = self._app_private_key.sign( data.encode('utf8'), padding.PKCS1v15(), hashes.SHA1()) else: signature = self._app_private_key.sign( data.encode('utf8'), padding.PKCS1v15(), hashes.SHA256()) return base64.b64encode(signature).decode('utf8') def sync_verify(self, method, raw_data): """ 同步验签 :return: """ method = method.replace('.', '_') + '_response' raw_data = raw_data.decode('utf8') sign_index = raw_data.rfind('sign') signature = base64.b64decode(raw_data[sign_index + 7: -2]) method_data = raw_data[raw_data.find(method) + len(method) + 2: sign_index - 2] self._public_key.verify( signature, method_data.encode('utf8'), padding.PKCS1v15(), hashes.SHA256()) def async_verify(self, data): """ 异步验签 :return: """ sign_data = {} for k, v in data.items(): if k in ('sign', 'sign_type'): continue sign_data[k] = v self._public_key.verify( base64.b64decode(data['sign']), '&'.join(['{}={}'.format(item) for item in self.sort_data(sign_data)]).encode('utf8'), padding.PKCS1v15(), hashes.SHA256()) @staticmethod def sort_data(data): return [(k, data[k]) for k in sorted(data.keys())] def params(self, method, biz_content): data = { 'app_id': self._app_id, 'method': method, 'format': self._format, 'charset': self._charset, 'sign_type': self._sign_type, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), # yyyy-MM-dd HH:mm:ss 'version': '1.0', 'biz_content': json.dumps(biz_content, separators=(',', ':')) } if self._notify_url: data['notify_url'] = self._notify_url sign = self.app_private_sign('&'.join(['{}={}'.format(item) for item in self.sort_data(data)])) p = '&'.join(['{}={}'.format(item[0], urllib.parse.quote(item[1])) for item in self.sort_data(data)]) p += '&{}={}}'.format('sign', urllib.parse.quote(sign)) return p def command(self, method, biz_content): params = self.params(method, biz_content) response = requests.get('%s?%s' % (self._gateway_url, params)) response_raw_data = response.content response_data = response.json() alipay_response_data = response_data[method.replace('.', '_') + '_response'] if alipay_response_data.get('code', '10000') != '10000': raise AliPayException(alipay_response_data) self.sync_verify(method, response_raw_data) return alipay_response_data if __name__ == '__main__': alipay = AliPay(**{ 'app_id': '...', 'seller_id': '...', 'gateway_url': 'https://openapi.alipaydev.com/gateway.do', 'notify_url': '...', 'app_private_key': 'path to private_key.pem', 'public_key': 'path to public_key.pem' }) biz_content = { 'out_trade_no': "111", 'total_amount': 0.01, 'subject': "test", } alipay.command('alipay.trade.precreate', biz_content)