跳到主要内容

Demo - Python

使用 Python 编程语言编写的请求示例。


rsa_util

requirements.txt

pycryptodome~=3.20.0
requests~=2.32.2

工具代码

import base64

from Crypto.Cipher import PKCS1_v1_5 as Cipher
from Crypto.Cipher.PKCS1_v1_5 import PKCS115_Cipher
from Crypto.Hash import SHA256
from Crypto.Hash.SHA256 import SHA256Hash
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5 as Signature


def import_key(key_str: str) -> RSA:
if not key_str.startswith('-----') and not key_str.endswith('-----'):
key_str = '-----BEGIN KEY-----\n%s\n-----END KEY-----' % key_str
key_bytes: bytes = key_str.encode()
return RSA.importKey(key_bytes)


def public_key(key_str: str) -> RSA:
key: RSA = import_key(key_str)
assert not key.has_private()
return key


def private_key(key_str: str) -> RSA:
key: RSA = import_key(key_str)
assert key.has_private()
return key


def encrypt(key: RSA, plains: str) -> str:
assert not key.has_private()
size: int = key.size_in_bits()
block_size: int = (size // 8) - 11
crypto_cipher: PKCS115_Cipher = Cipher.new(key)
plains_bytes: bytes = plains.encode('utf-8')
cipher: bytes = b""
for i in range(0, len(plains_bytes), block_size):
block: bytes = plains_bytes[i:i + block_size]
cipher += crypto_cipher.encrypt(block)
return base64.b64encode(cipher).decode('utf-8')


def decrypt(key: RSA, cipher: str) -> str:
assert key.has_private()
size: int = key.size_in_bits()
block_size: int = size // 8
crypto_cipher: PKCS115_Cipher = Cipher.new(key)
cipher_bytes: bytes = base64.b64decode(cipher)
plains_bytes: bytes = b""
for i in range(0, len(cipher_bytes), block_size):
block: bytes = cipher_bytes[i:i + key.size_in_bytes()]
plains_bytes += crypto_cipher.decrypt(block, None)
return plains_bytes.decode('utf-8')


def sign(key: RSA, cipher: str) -> str:
assert key.has_private()
cipher_hash: SHA256Hash = SHA256.new(cipher.encode("utf-8"))
sign_bytes: bytes = Signature.new(key).sign(cipher_hash)
return base64.b64encode(sign_bytes).decode('utf-8')


def verify(key: RSA, cipher: str, sign_str: str):
assert not key.has_private()
cipher_hash: SHA256Hash = SHA256.new(cipher.encode("utf-8"))
sign_data = base64.b64decode(sign_str.encode("utf-8"))
return Signature.new(key).verify(cipher_hash, sign_data)


fire_demo

import datetime
import json
import uuid
from unittest import TestCase

import requests

import rsa_util


class Test(TestCase):
# 服务商(agency)的编号
# 此处使用 agency_no = 100000000000000000 作为模拟
# 真实请求场景,请咨询 ice-run 的技术支持获取正式的 服务商编号 agency_no
AGENCY_NO = "100000000000000000"

# 安全算法 RSA
SECURITY = "RSA"

# ice-run 的 公钥
PUBLIC_KEY = "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC78QqLKTkVcwvB2Scm4IoImOXmIthVmx4HHr5Im09aol2r2UlMK3jl4lamdxktvjFswWx41hQBD4tyYFHTY+mQJsRK8Zwm7LZmLJDHcgibIuLxEddln+MI+rWwT+SOdOt7lBvd9PcO4nYr/3Nqkyk8L4PrT+GF7dpVk8iE/VpTwQIDAQAB"

# agency 的 私钥
PRIVATE_KEY = "MIICdwIBADANBgkqhkiG9w0BAQEFAASCAmEwggJdAgEAAoGBAOyC76Y+/2NqkkIVUtAHkbZkBYI2XXkuqE9o4oIDkbyAY04jC5MLa3oVc3uGG7T/Elodo9dIbIxNh7/ir7GQLR/hY+ookHAewurJ1z4Jn5TQeRk3+nYQcX6a6orfUOMffwcn0Jtat8D+cNAu3uCi+N7Ez/yNpItNORYqTa8AkgzrAgMBAAECgYAS/rsfl3ysb+E6RHsnsQvvYZ4dpJ8iPfCPnCVg+sdoI8mV+3ORBkBGCFYDjDRKd5fyO+IuRqdNJ2bpLtwcfy9YchcV/x40GIuAlre6A6qt4Raq+0l4FJufaqgdmSiLxd5zPX79gdMvBqTbdYzCSDGP5Pf3pTqIS3iKvSixFTQBzQJBAO6k6rE6JE/EHrbRjVtZNitDZ8rouCL0s9nGsgIDBSfgOCClO+HlZKcHq6W9ry8j1gOFeGH2rF1yMslA/ZymRL0CQQD9tk/gk9Xs1Eg1if7SLwvNxUP+z96oEqQTF26tSudPRwOO88fnfE4ZZ4heBt1QI+s7IhG39qecsFW269nvKfbHAkEArXmEgUBalQFjslGyB+1Zyyk8keuJrx9ifbRKQdwgK1R6eICkfxlZiXGx/NFeP041jGnBkLTXpzYUZOexc+YJoQJBANR3r87vnxAU+l+zr62O7oCk+XtT0y/HZJYEYpBHEQyn+MfnSXqG89R8iovLjd0GJ4E+173KlrU2SqHEQ57w8pMCQAEoDjFUXpYzBl8e0M9XsSpGY531mbOl8ASnVp4OSyavaCsn56eEkL9TI5q69KrYUEA4PN3BazMWfll2s/wu6G0="

# ice-run 的 test 环境的 API 网关 的域名地址
# 正式生产环境的域名地址请查阅 ice-run 的技术文档
DOMAIN = "test-fire-api.ice.run"

# 示例地址,仅用于测试加密解密加签验签流程,无任何实际业务场景
DEMO = "/demo/api/demo"

def test_demo(self):
# 对方公钥
public_key = rsa_util.public_key(self.PUBLIC_KEY)

# 己方私钥
private_key = rsa_util.private_key(self.PRIVATE_KEY)

# 此处使用 `${URL}` 模拟请求地址
url = "https://" + self.DOMAIN + self.DEMO
print("request url :", url)

# 请求参数:通常为复杂对象。
# 此处使用 json 对象 模拟,实际场景可以定义数据模型类
# 注意:不可传 null 或 空字符串 或 单值 ( string, number, boolean )
# 如果当前业务接口不需要传递具体参数,可以传空对象参数 `{}`,json 序列化为 `{"param":{}}`
param_object = {"name": "喵喵汪汪"}

# 参数明文:参数对象的 json 序列化后的字符串的明文
param_plains = json.dumps(param_object)
print("param plains :", param_plains)

# 参数密文:agency 使用 ice-run 公钥 对参数明文加密后得到的密文
param_cipher = rsa_util.encrypt(public_key, param_plains)
print("param cipher :", param_cipher)

# 请求签名:agency 使用 私钥 对参数密文进行签名
# 当 ice-run 收到请求时,将会使用 agency 公钥 进行验签
sign = rsa_util.sign(private_key, param_cipher)
print("request sign :", sign)

# 请求数据 request body
# 此处使用 json 对象 模拟,实际场景建议定义数据模型类
# body 数据的 键 固定为 param ,值为 json 序列化后的字符串 RSA 加密后的密文
request_object = {"param": param_cipher}
request_body = json.dumps(request_object)
print("request body :", request_body)

# 请求时间,当前世界标准时间的字符串,格式为 RFC-1123 标准,服务器会校验时间差,不允许超过设置的时间间隔(默认为 2 分钟)
time = datetime.datetime.now(datetime.UTC).strftime('%a, %d %b %Y %H:%M:%S GMT')

# 链路追踪号(防止重放攻击的请求编号)
# 单位时间内(默认为 1 分钟)不重复的字符串,建议使用 uuid 或者时间戳+随机数。限制长度 = 32。正则表达式 = `^[0-9a-f]{32}$`
trace = uuid.uuid4().hex

# 请求头
headers = {
"Content-Type": "application/json",
"X-Client": self.AGENCY_NO,
"X-Security": self.SECURITY,
"X-Sign": sign,
"X-Time": time,
"X-Trace": trace
}
print("request header :", headers)

# http 客户端。此处使用 requests 库模拟,实际场景建议自行选择 HTTP 客户端组件包
# 发起 HTTP 请求之前,请确认 `${DOMAIN}` 可以正常访问,部分客户端的服务器环境可能需要设置请求代理
response = requests.post(url, data=request_body, headers=headers)

# 响应状态码
status_code = response.status_code
print("response status :", status_code)
assert status_code == 200

# 响应 header
print("response header :", response.headers)

sign = response.headers.get("X-Sign")
print("response sign :", sign)

# 响应 body json 格式的字符串
response_body = response.text
print("response body :", response_body)

# 响应数据 response body
# 此处使用 json 对象 模拟,实际场景建议定义数据模型类
# 数据的 键 有 code 和 message 和 data
# 其中 data 为 json 序列化后的字符串 RSA 加密后的密文
response_object = response.json()

# 响应码,默认 000000 表示请求成功
code = response_object["code"]
print("response code :", code)
assert code == "000000"

# 响应 数据密文
data_cipher = response_object["data"]
print("data cipher :", data_cipher)

# 响应验签:ice-run 处理响应时,会使用 ice-run 私钥对响应密文进行签名
# agency 收到响应后,需要使用 ice-run 公钥进行验签
verify = rsa_util.verify(public_key, data_cipher, sign)
if not verify:
raise Exception("verification failed")

# 响应 数据明文
data_plains = rsa_util.decrypt(private_key, data_cipher)
print("data plains :", data_plains)

data = json.loads(data_plains)