В этой статье представлены некоторые Python Часто используемые пакеты голосовой связи и способы звонка поставщику облачных услуг. API 进行распознавание речи
В основном используется пакет pyaudio, который может записывать/воспроизводить звук в виде байтового потока.
Установить:
pip install pyaudio
Список устройств, которые могут записывать
import pyaudio
p = pyaudio.PyAudio()
# Get the number of audio I/O devices
devices = p.get_device_count()
for i in range(devices):
device_info = p.get_device_info_by_index(i)
if device_info.get('maxInputChannels') > 0:
print(f"{device_info.get('index')}: {device_info.get('name')}")
начинатьзапись 5 Секунды, сохраните записанный звук в io.BytesIO в объекте
import io
FORMAT = pyaudio.paInt16 # format of audio samples
CHANNELS = 1 # audio channels(1: mono, 2: stereo)
RATE=44100 # sample rate
CHUNK=1024 # number of frames per buffer
RECORD_SECONDS = 5
p = pyaudio.PyAudio()
stream = p.open(
format=FORMAT, # format of audio samples
channels=CHANNELS, # audio channels(1: mono, 2: stereo)
rate=RATE, # sample rate
frames_per_buffer=CHUNK, # number of frames per buffer
input=True,
)
print("Recording...")
buffer = io.BytesIO()
for _ in range(0, int(RATE / CHUNK * RECORD_SECONDS))
data = stream.read(CHUNK)
buffer.write(data)
stream.stop_stream()
stream.close()
p.terminate()
Используйте стандартную библиотеку wave пакет сохраняет аудиобайты в wav файл, он будет wav Формат записи в заголовок файла,Посмотреть подробностидокумент:The Python Standard Library - wave
import wave
with wave.open(output, 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))
wf.setframerate(RATE)
wf.writeframes(data)
print(f"Recording saved as {output}")
Если вам не нужно записывать в файл и вы просто хотите, чтобы он помог добавить заголовок wav-файла в поток байтов, вы можете сделать следующее
output = io.BytesIO()
with wave.open(output, 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))
wf.setframerate(RATE)
wf.writeframes(data)
output.getvalue()
Тенсент Облакоизраспознавание Видов сервисов много, здесь я попробовал только "Распознание" одним «предложениям» и «запись версии скорости распознавания файлов»
ссылкадокумент,Волязапись Байты конвертируются в base64 После кодирования передать API Отправьте через интерфейс и вы сможете получить результаты распознавания
Обратите внимание, что байтовые данные здесь должны включать соответствующий заголовок файла формата файла, то есть, если это указанное выше pyaudio Чтобы получить поток байтов, вам нужно сначала использовать wave Модуль добавляет заголовок файла, иначе Тенент Интерфейс Облако сообщит об ошибке распознавания формата.
import base64
import json
from tencentcloud.common.common_client import CommonClient
from tencentcloud.common import credential
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.common.profile.client_profile import ClientProfile
try:
cred = credential.Credential(SECRET_ID, SECRET_KEY)
httpProfile = HttpProfile()
httpProfile.endpoint = "asr.tencentcloudapi.com"
clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
encoded = base64.b64encode(sentence_bytes).decode()
print(encoded)
data = {
'EngSerViceType': '16k_zh',
'SourceType': 1,
'VoiceFormat': 'wav',
'Data': encoded,
'DataLen': len(sentence_bytes)
}
params = json.dumps(data)
common_client = CommonClient("asr", "2019-06-14", cred, "", profile=clientProfile)
print(common_client.call_json("SentenceRecognition", json.loads(params)))
except TencentCloudSDKException as err:
print(err)
ссылкаОфициальный примеркод,
import requests
import hmac
import hashlib
import base64
import time
import json
from tencentcloud.common.profile.http_profile import HttpProfile
class FlashRecognitionRequest:
def __init__(self, engine_type):
self.engine_type = engine_type
self.speaker_diarization = 0
self.hotword_id = ""
self.customization_id = ""
self.filter_dirty = 0
self.filter_modal = 0
self.filter_punc = 0
self.convert_num_mode = 1
self.word_info = 0
self.voice_format = ""
self.first_channel_only = 1
self.reinforce_hotword = 0
self.sentence_max_length = 0
def set_first_channel_only(self, first_channel_only):
self.first_channel_only = first_channel_only
def set_speaker_diarization(self, speaker_diarization):
self.speaker_diarization = speaker_diarization
def set_filter_dirty(self, filter_dirty):
self.filter_dirty = filter_dirty
def set_filter_modal(self, filter_modal):
self.filter_modal = filter_modal
def set_filter_punc(self, filter_punc):
self.filter_punc = filter_punc
def set_convert_num_mode(self, convert_num_mode):
self.convert_num_mode = convert_num_mode
def set_word_info(self, word_info):
self.word_info = word_info
def set_hotword_id(self, hotword_id):
self.hotword_id = hotword_id
def set_customization_id(self, customization_id):
self.customization_id = customization_id
def set_voice_format(self, voice_format):
self.voice_format = voice_format
def set_sentence_max_length(self, sentence_max_length):
self.sentence_max_length = sentence_max_length
def set_reinforce_hotword(self, reinforce_hotword):
self.reinforce_hotword = reinforce_hotword
class FlashRecognizer:
def __init__(self):
pass
def _format_sign_string(self, param):
signstr = "POSTasr.cloud.tencent.com/asr/flash/v1/"
for t in param:
if 'appid' in t:
signstr += str(t[1])
break
signstr += "?"
for x in param:
tmp = x
if 'appid' in x:
continue
for t in tmp:
signstr += str(t)
signstr += "="
signstr = signstr[:-1]
signstr += "&"
signstr = signstr[:-1]
return signstr
def _build_header(self):
header = dict()
header["Host"] = "asr.cloud.tencent.com"
return header
def _sign(self, signstr, secret_key):
hmacstr = hmac.new(secret_key.encode('utf-8'),
signstr.encode('utf-8'), hashlib.sha1).digest()
s = base64.b64encode(hmacstr)
s = s.decode('utf-8')
return s
def _build_req_with_signature(self, secret_key, params, header):
query = sorted(params.items(), key=lambda d: d[0])
signstr = self._format_sign_string(query)
signature = self._sign(signstr, secret_key)
header["Authorization"] = signature
requrl = "https://"
requrl += signstr[4::]
return requrl
def _create_query_arr(self, req):
query_arr = dict()
query_arr['appid'] = APP_ID
query_arr['secretid'] = SECRET_ID
query_arr['timestamp'] = str(int(time.time()))
query_arr['engine_type'] = req.engine_type
query_arr['voice_format'] = req.voice_format
query_arr['speaker_diarization'] = req.speaker_diarization
query_arr['hotword_id'] = req.hotword_id
query_arr['customization_id'] = req.customization_id
query_arr['filter_dirty'] = req.filter_dirty
query_arr['filter_modal'] = req.filter_modal
query_arr['filter_punc'] = req.filter_punc
query_arr['convert_num_mode'] = req.convert_num_mode
query_arr['word_info'] = req.word_info
query_arr['first_channel_only'] = req.first_channel_only
query_arr['reinforce_hotword'] = req.reinforce_hotword
query_arr['sentence_max_length'] = req.sentence_max_length
return query_arr
def recognize(self, req, data):
header = self._build_header()
query_arr = self._create_query_arr(req)
req_url = self._build_req_with_signature(SECRET_KEY, query_arr, header)
r = requests.post(req_url, headers=header, data=data)
return r.text
recognizer = FlashRecognizer()
# Создать новый запрос на идентификацию
req = FlashRecognitionRequest('16k_zh')
req.set_filter_modal(0)
req.set_filter_punc(0)
req.set_filter_dirty(0)
req.set_voice_format("wav")
req.set_word_info(0)
req.set_convert_num_mode(1)
#Выполнить идентификацию
resultData = recognizer.recognize(req, audio_bytes)
resp = json.loads(resultData)
request_id = resp["request_id"]
code = resp["code"]
if code != 0:
print("recognize faild! request_id: ", request_id, " code: ", code, ", message: ", resp["message"])
return ''
#One Channell_result соответствует результату распознавания одного канала
#Большая часть звука монофоническая, что соответствует Channl_result
try:
result = resp["flash_result"][0]['text']
except Exception as e:
print(f'parse error: {e}')
Как и в случае с «Распознаванием одним предложением», загружаемые данные также должны содержать заголовок файла соответствующего формата.
Пробовал здесьiFlytekиз实时Транскрипция речиинтерфейс,проходить websocket путь, направьте поток байтов на websocket сервер и принять результаты распознавания
这里ссылка了Официальный пример,Используйте вместе async/await переписал программу
from datetime import datetime
import time
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
import asyncio
import base64
import hashlib
import hmac
import websockets
import json
STATUS_FIRST_FRAME = 0 # Идентификатор первого кадра
STATUS_CONTINUE_FRAME = 1 # Идентификатор промежуточного кадра
STATUS_LAST_FRAME = 2 # Идентификатор последнего кадра
def get_url(app_key: str, app_secret: str) -> str:
url = 'wss://ws-api.xfyun.cn/v2/iat'
# Создать временную метку в формате RFC1123.
now = datetime.now()
date = format_date_time(time.mktime(now.timetuple()))
# Объединение строк
signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
signature_origin += "date: " + date + "\n"
signature_origin += "GET " + "/v2/iat " + "HTTP/1.1"
# Зашифровать с помощью hmac-sha256
signature_sha = hmac.new(app_secret.encode('utf-8'), signature_origin.encode('utf-8'),
digestmod=hashlib.sha256).digest()
signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % (
app_key, "hmac-sha256", "host date request-line", signature_sha)
authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
# Объедините запрошенные параметры аутентификации в словарь.
v = {
"authorization": authorization,
"date": date,
"host": "ws-api.xfyun.cn"
}
# Соедините параметры аутентификации для генерации URL-адреса
url = url + '?' + urlencode(v)
return url
async def iflytek_recognition(data: bytes):
url = get_url(app_key=APP_KEY, app_secret=APP_SECRET)
async with websockets.connect(url) as ws:
frameSize = 8000 # размер звука на кадр
intervel = 0.04 # Интервал отправки аудио (единица измерения: с)
status = STATUS_FIRST_FRAME # Информация о состоянии звука, определяющая, является ли звук первым кадром, средним кадром или последним кадром.
common_args = {"app_id": APP_ID}
business_args = {"domain": "iat", "language": "zh_cn", "accent": "mandarin", "vinfo":1, "vad_eos":10000}
i = 0
while True:
buf = data[i*frameSize:(i+1)*frameSize]
i += 1
print(len(buf))
# конец файла
if not buf:
status = STATUS_LAST_FRAME
# Первая обработка кадра
# Отправьте первый кадр аудио вместе с бизнесом параметр
# appid Должен быть доставлен, отправляется только первый кадр
if status == STATUS_FIRST_FRAME:
d = {"common": common_args,
"business": business_args,
"data": {"status": 0, "format": "audio/L16;rate=16000",
"audio": str(base64.b64encode(buf), 'utf-8'),
"encoding": "raw"}}
d = json.dumps(d)
await ws.send(d)
status = STATUS_CONTINUE_FRAME
# Промежуточная обработка кадров
elif status == STATUS_CONTINUE_FRAME:
d = {"data": {"status": 1, "format": "audio/L16;rate=16000",
"audio": str(base64.b64encode(buf), 'utf-8'),
"encoding": "raw"}}
await ws.send(json.dumps(d))
# Обработка последнего кадра
elif status == STATUS_LAST_FRAME:
d = {"data": {"status": 2, "format": "audio/L16;rate=16000",
"audio": str(base64.b64encode(buf), 'utf-8'),
"encoding": "raw"}}
await ws.send(json.dumps(d))
break
# Интервал дискретизации аналогового звука
time.sleep(intervel)
message = await ws.recv()
result = ''
try:
code = json.loads(message)["code"]
sid = json.loads(message)["sid"]
if code != 0:
errMsg = json.loads(message)["message"]
print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
else:
data = json.loads(message)["data"]["result"]["ws"]
# print(json.loads(message))
for i in data:
for w in i["cw"]:
result += w["w"]
print("sid:%s call success!,data is:%s" % (sid, json.dumps(data, ensure_ascii=False)))
except Exception as e:
print("receive msg,but parse exception:", e)
return result
async def main():
with open('sample.wav', 'rb') as f:
await iflytek_recognition(f.read())
Здесь напишите функцию для управления переключением записи через пробел на клавиатуре и распечатайте распознавание результаты речи demo
Мониторинг клавиатуры использует пакет pynput, который прослушивает события клавиатуры и отвечает через поток.
Установить
pip install pynput
Полный код выглядит следующим образом
import pyaudio
import wave
from pynput import keyboard
import threading
import io
from list_devices import list_devices
from tencent import sentence_recognition, flash_recognition
from iflytek import iflytek_recognition
import asyncio
FORMAT = pyaudio.paInt16 # format of audio samples
CHANNELS = 1 # audio channels(1: mono, 2: stereo)
RATE=16000 # sample rate
CHUNK=1024 # number of frames per buffer
is_recording = False
device_index = 0
def save(data: bytes, output: str):
with wave.open(output, 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))
wf.setframerate(RATE)
wf.writeframes(data)
print(f"Recording saved as {output}")
def to_wav_bytes(data: bytes) -> bytes:
output = io.BytesIO()
with wave.open(output, 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))
wf.setframerate(RATE)
wf.writeframes(data)
return output.getvalue()
def do_record(device_index: int, output: str):
global is_recording
print("Recording...")
p = pyaudio.PyAudio()
buffer = io.BytesIO()
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
frames_per_buffer=CHUNK,
input=True,
input_device_index=device_index,
)
while is_recording:
data = stream.read(CHUNK)
buffer.write(data)
stream.stop_stream()
stream.close()
print("Finished.")
p.terminate()
data = buffer.getvalue()
# result = sentence_recognition(to_wav_bytes(data))
result = flash_recognition(to_wav_bytes(data))
# result = asyncio.run(iflytek_recognition(data))
print(result)
save(data, output)
def on_release(key):
global is_recording, device_index
try:
if key == keyboard.Key.space:
if not is_recording:
is_recording = True
threading.Thread(target=do_record, args=(device_index, 'output.wav')).start()
else:
is_recording = False
elif key == keyboard.Key.esc:
is_recording = False
# Stop listener
return False
except AttributeError:
print('special key {0} pressed'.format(
key))
def main():
global device_index
list_devices()
device_index = int(input("Please select input device:"))
with keyboard.Listener(on_release=on_release) as listener:
listener.join()
if __name__ == '__main__':
main()
попробуй это,Версия скорости распознавания файлов Тенсента Облако очень быстрая.,Сценарии голосовых порталов, подходящие для повседневных простых приложений