Skip to content

Instantly share code, notes, and snippets.

@sunziping2016
Created April 9, 2023 09:58
Show Gist options
  • Select an option

  • Save sunziping2016/886c9dd1d7de051e58e132b5697144c8 to your computer and use it in GitHub Desktop.

Select an option

Save sunziping2016/886c9dd1d7de051e58e132b5697144c8 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
from datetime import datetime
import os
from wsgiref.handlers import format_date_time
from time import mktime
import hashlib
import base64
import hmac
from urllib.parse import urlencode, urlparse
import json
import requests
import sys
import subprocess
import tempfile
import regex as re
class XFClient:
def __init__(self, appid, api_secret, api_key):
self.appid = appid
self.request_url = XFClient.assemble_url(
'https://api.xf-yun.com/v1/private/s9a87e3ec',
"POST", api_key, api_secret)
self.headers = {'content-type': "application/json",
'host': 'api.xf-yun.com', 'app_id': self.appid}
# calculate sha256 and encode to base64
def sha256base64(self, data):
sha256 = hashlib.sha256()
sha256.update(data)
digest = base64.b64encode(sha256.digest()).decode()
return digest
@staticmethod
def assemble_url(request_url, method="POST", api_key="", api_secret=""):
url = urlparse(request_url)
host, path = url.netloc, url.path
date = format_date_time(mktime(datetime.now().timetuple()))
sig_origin = f'host: {host}\ndate: {date}\n{method} {path} HTTP/1.1'
sig_sha = hmac.new(api_secret.encode(), sig_origin.encode(),
digestmod=hashlib.sha256).digest()
sig_sha = base64.b64encode(sig_sha).decode()
auth_origin = f'api_key="{api_key}", algorithm="hmac-sha256", ' + \
f'headers="host date request-line", signature="{sig_sha}"'
auth = base64.b64encode(auth_origin.encode()).decode()
# print(authorization_origin)
values = {
"host": host,
"date": date,
"authorization": auth
}
return request_url + "?" + urlencode(values)
def get_body(self, text):
body = {
"header": {
"app_id": self.appid,
"status": 3,
# "uid":"your_uid"
},
"parameter": {
"s9a87e3ec": {
# "res_id":"your_res_id",
"result": {
"encoding": "utf8",
"compress": "raw",
"format": "json"
}
}
},
"payload": {
"input": {
"encoding": "utf8",
"compress": "raw",
"format": "plain",
"status": 3,
"text": base64.b64encode(text.encode("utf-8")).decode()
}
}
}
return body
def get_result(self, text):
body = self.get_body(text)
response = requests.post(
self.request_url, data=json.dumps(body), headers=self.headers)
tempResult = json.loads(response.content.decode())
if 'payload' not in tempResult:
print(tempResult, file=sys.stderr)
exit(1)
return json.loads(base64.b64decode(tempResult['payload']['result']['text']).decode())
if __name__ == '__main__':
# 控制台获取
demo = XFClient(
os.environ.get('XF_APP_ID'),
os.environ.get('XF_API_SECRET'),
os.environ.get('XF_API_KEY'))
pattern = re.compile(r'([\p{IsHan}]+)', re.UNICODE)
# 需纠错文本
full_text = sys.stdin.read()
correct_text = []
for text in full_text.split('\n\n'):
if pattern.search(text) is None:
correct_text.append(text)
continue
result = demo.get_result(text)
result = sorted([j for i in result.values() for j in i])
for pos, cur, correct, description in result[::-1]:
text = text[:pos] + correct + text[pos + len(cur):]
correct_text.append(text)
correct_text = '\n\n'.join(correct_text)
temp = tempfile.NamedTemporaryFile()
temp.write(full_text.encode())
temp.flush()
delta = subprocess.run(['delta', '--word-diff-regex', r'\w',
'--diff-highlight', temp.name, '-'], input=correct_text.encode())
temp.close()
exit(delta.returncode)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment