|
- # -*- coding: utf-8 -*-
- import sys
- import base64
- import hashlib
- import hmac
- import json
- import os
- import time
- import requests
- import urllib
- from pydub import AudioSegment
- import subprocess
- from pathlib import Path
- from openai import OpenAI
-
-
- #优化压缩讯飞识别出来的文字
-
- import json
- from jsonpath import jsonpath
-
- def generate_data(filepath,outputfile):
- datas = []
- with open(filepath,'r',encoding='utf-8') as fs:
- data = json.load(fs)
-
- json_1best = jsonpath(data, "$..json_1best")
- for json_best in json_1best:
- if type(json_best) == str:
- json_data = json.loads(json_best)
- else:
- json_data = json_best
- bg_time = jsonpath(json_data,'$..bg')[0]
- ed_time = jsonpath(json_data,'$..ed')[0]
- text = ''.join(jsonpath(json_data,'$..w'))
- datas.append({'text':text,'bg_time':bg_time,'ed_time':ed_time})
- with open(outputfile,'w',encoding='utf-8') as fp:
- json.dump(datas,fp,ensure_ascii=False)
-
-
-
- # 视频中提取音频
-
- def extract_audio_from_video(video_path, audio_path):
- audio = AudioSegment.from_file(video_path)
- audio = audio.set_channels(1)
- speed_factor = 1.2 # 变速倍数
- new_frame_rate = int(audio.frame_rate * speed_factor)
- audio = audio._spawn(audio.raw_data, overrides={'frame_rate': new_frame_rate})
- audio.export(audio_path, format="mp3", bitrate="96k")
-
-
- # 请求的接口名
- lfasr_host = 'https://raasr.xfyun.cn/v2/api'
- api_upload = '/upload'
- api_get_result = '/getResult'
-
-
- class RequestApi(object):
- def __init__(self, appid, secret_key, upload_file_path):
- self.appid = appid
- self.secret_key = secret_key
- self.upload_file_path = upload_file_path
- self.ts = str(int(time.time()))
- self.signa = self.get_signa()
-
- def get_signa(self):
- appid = self.appid
- secret_key = self.secret_key
- m2 = hashlib.md5()
- m2.update((appid + self.ts).encode('utf-8'))
- md5 = m2.hexdigest()
- md5 = bytes(md5, encoding='utf-8')
- # 以secret_key为key, 上面的md5为msg, 使用hashlib.sha1加密结果为signa
- signa = hmac.new(secret_key.encode('utf-8'), md5, hashlib.sha1).digest()
- signa = base64.b64encode(signa)
- signa = str(signa, 'utf-8')
- return signa
-
-
- def upload(self):
- # print("上传部分:")
- upload_file_path = self.upload_file_path
- file_len = os.path.getsize(upload_file_path)
- file_name = os.path.basename(upload_file_path)
-
- param_dict = {}
- param_dict['appId'] = self.appid
- param_dict['signa'] = self.signa
- param_dict['ts'] = self.ts
- param_dict["fileSize"] = file_len
- param_dict["fileName"] = file_name
- param_dict["duration"] = "200"
- # print("upload参数:", param_dict)
- data = open(upload_file_path, 'rb').read(file_len)
-
- response = requests.post(url =lfasr_host + api_upload+"?"+urllib.parse.urlencode(param_dict),
- headers = {"Content-type":"application/json"},data=data)
- # print("upload_url:",response.request.url)
- result = json.loads(response.text)
- # print("upload resp:", result)
- return result
-
-
- def get_result(self):
- uploadresp = self.upload()
- orderId = uploadresp['content']['orderId']
- param_dict = {}
- param_dict['appId'] = self.appid
- param_dict['signa'] = self.signa
- param_dict['ts'] = self.ts
- param_dict['orderId'] = orderId
- param_dict['resultType'] = "transfer"
- status = 3
- while status == 3:
- response = requests.post(url=lfasr_host + api_get_result + "?" + urllib.parse.urlencode(param_dict),
- headers={"Content-type": "application/json"},timeout=360)
- result = json.loads(response.text)
- status = result['content']['orderInfo']['status']
- if status == 1:
- break
- txtname = self.upload_file_path.replace(".mp3", "1.txt")
- with open(txtname,'w',encoding='utf-8') as f:
- f.write(result['content']['orderResult'])
-
- return result
-
-
- #sk-bDOV3ygxCgvDACVLAOj3u7Gpw3YP90ssdn92E0KvnFeMpKap
-
- #月之暗面
-
- def get_zaiyao(reportpath):
- client = OpenAI(
- api_key = "sk-bDOV3ygxCgvDACVLAOj3u7Gpw3YP90ssdn92E0KvnFeMpKap",
- base_url = "https://api.moonshot.cn/v1",
- )
- filename=reportpath+'.txt'
- # xlnet.pdf 是一个示例文件, 我们支持 pdf, doc 以及图片等格式, 对于图片和 pdf 文件,提供 ocr 相关能力
- file_object = client.files.create(file=Path(filename), purpose="file-extract")
-
- # 获取结果
- # file_content = client.files.retrieve_content(file_id=file_object.id)
- # 注意,之前 retrieve_content api 在最新版本标记了 warning, 可以用下面这行代替
- # 如果是旧版本,可以用 retrieve_content
- file_content = client.files.content(file_id=file_object.id).text
-
- # 把它放进请求中
- messages = [
- {
- "role": "system",
- "content": "你是 Kimi,由 Moonshot AI 提供的人工智能助手,你更擅长中文和英文的对话。你会为用户提供安全,有帮助,准确的回答。同时,你会拒绝一切涉及恐怖主义,种族歧视,黄色暴力等问题的回答。Moonshot AI 为专有名词,不可翻译成其他语言。",
- },
- {
- "role": "system",
- "content": file_content,
- },
- {"role": "user",
- "content":"上面这个文件是一段教学音频识别出来的Json格式文件,其中bg_time键是一段话的起始时间。现拟将这段教学文字自动解析和提取教学核心内容,生成一段500字以内的文本摘要,概述教学流程(如首先,其次,接着,随后,最后),并使用mm:ss格式标注出在每段话的起始时间点位,用于帮助用户快速把握教学目标、教学内容、教学过程和教学特色。并给出3-5个教学特色关键词(可涉及教学方法、教学内容)。请你以教学评价专家的身份,帮我处理该文件。",
- }
- ]
-
- # 然后调用 chat-completion, 获取 Kimi 的回答
- completion = client.chat.completions.create(
- model="moonshot-v1-32k",
- messages=messages,
- temperature=0.3,
- )
-
- with open(reportpath+'.json','w') as f:
- f.write(completion.choices[0].message.content)
- mp3file_path = Path(reportpath+'.mp3')
- txtfile_path = Path(reportpath+'.txt')
- txt1file_path = Path(reportpath+'1.txt')
- if mp3file_path.exists():
- mp3file_path.unlink() # 删除文件
- if txtfile_path.exists():
- txtfile_path.unlink() # 删除文件
- if txt1file_path.exists():
- txt1file_path.unlink() # 删除文件
- #print(completion.choices[0].message.content)
-
-
- # 输入讯飞开放平台的appid,secret_key和待转写的文件路径
- if __name__ == '__main__':
- #mpeg_section()
- print(sys.argv)
- name=sys.argv[2]
- # filename=[]
- video_path = sys.argv[1]+"\\"
- report_path = sys.argv[3] +"\\"
- extract_audio_from_video(video_path+name+'.mp4', report_path+name+'.mp3')
- api = RequestApi(appid="4f50a2a4",
- secret_key="ba8734e4ab6f036057dfc065efd7bc0e",
- upload_file_path=report_path+name+'.mp3')
- api.get_result()
- #处理提炼json文件
- input_file =report_path+name+'1.txt' # 输入文件名
- output_file =report_path+name+'.txt' # 输出文件名
- datas = generate_data(input_file,output_file)
- #获取摘要
- get_zaiyao(report_path+name)
|