Привет!
Изучая четвёртый язык, я в очередной раз решил попробовать обучить свою биологическую нейросеть на книгах с параллельным переводом, но после пары вечеров в такой же очередной раз их оставил. Подобный подход, когда переводом сопровождается каждое предложение, кажется несколько избыточным и мешающим погружению, и если в текстовом варианте можно хотя бы перескочить взглядом через перевод, то для прочих форматов, например, для любимых мною аудиокниг, этот подход не сработает в принципе. Самый популярный вариант «обучающего перевода», которым пользовались и вы, – интерактивный, в котором пользователь следит за текстом на языке оригинала, и сам раскрывает переводы и пояснения забытых или новых для себя слов. Можно ли совместить эти подходы, взяв преимущества каждого, и переложить их в формат аудиокниги? Этим сегодня и займёмся.
Для реализации подобного частичного перевода мы могли бы ориентироваться на «сложность» слов, предлагая к переводу только те, которые, потенциально, не знакомы слушателям с уровнем, скажем, A2. Дальше могли бы играться с порогом пресловутой сложности, подбирать кому попроще, кому посложнее… Но это всё отталкивается от среднего по больнице, которое в действительности не удовлетворит никого.
Было бы куда интереснее, если бы мы могли ориентироваться на словарный запас каждого конкретного пользователя, и предлагали к переводу только новые, или давно не встречавшиеся, по интервальной методике, слова. И мы можем это сделать, если замкнём весь цикл вокруг одного пользователя: обрабатываем текст на основе личного словаря пользователя, по которому генерируем аудиокнигу с персонализированным переводом, словами этой генерации расширяем словарь для следующих текстов, goto 0.
В первом приближении для этой задачи нам предстоит реализовать части предварительной обработки текста, перевода, синтеза речи, и собрать всё это воедино, добавив чуть логики, о которой мы говорили абзацем выше. С существующими библиотеками это приключение на 3 импорта и 20 минут, не так ли? Не совсем. Будет много кода, чтобы покрыть все важные аспекты. С полной же реализацией можно ознакомиться и опробовать на гитхабе.
Стоит заранее оговориться, что автор несколько поверхностно погружён в область NLP, потому горячо приветствуются все замечания, подсказки и наводки, как по общему принципу работы, так и по отдельным частям реализации.
❯ Перевод
Что же, приступим. Первым логическим этапом, как ни странно, будет перевод, так как к шагу обработки мы должны подходить уже с готовыми предложениями на двух языках.
Здесь мы можем оставить выбор за пользователем, реализовав различные возможности перевода. Среди существующих инструментов перевода лучше прочих себя зарекомендовали ArgosTranslate и Google Cloud. У первого хоть результат немного и отдаёт потраченными переводами, но среди бесплатных локальных инструментов, пожалуй, лучший. У GC на бесплатном тарифе существует лимит в 500.000 символов в месяц, но качество на порядок выше, хотя до уровня DeepL ещё можно расти.
Лучшим же вариантом будет использование литературного, профессионального перевода. Предполагается, что пользователь предоставит его вместе с текстом оригинала. Не в виде параллельного текста, иначе зачем мы здесь, а просто отдельный перевод. Это подводит нас к задаче выравнивания предложений, то есть сопоставления, какие предложения оригинала соответствуют каким предложениям перевода. Разумеется, проитерироваться через zip(оригинал, перевод)
самую малость недостаточно. Несколько предложений на языке оригинала могут быть переведены одним, или наоборот. В литературном переводе это особенно актуально.
Среди опробованных инструментов выравнивания с предобученными моделями отлично себя показал SentenceTransformers с моделью paraphrase-multilingual-MiniLM-L12-v2
. Получив с его помощью сходства между предложениями оригинала и перевода, сопоставим их друг с другом, обращаясь только к соседним предложениям.
def get_literary_translation_from_file(self, original_sentences: list[str]) -> tuple[list[str], list[str]]:
with open(config._root_dir / 'input_translation.txt', 'r', encoding='utf-8') as file:
text = file.read()
translated_sentences = self.text_processor.get_sentences(text)
sentences = original_sentences + translated_sentences
embeddings = self.model.encode(sentences) # self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
similarities = self.model.similarity(embeddings, embeddings)
result_idxs = []
nh, nf = len(original_sentences), len(sentences)
src_idx, trg_idx = 0, nh
while src_idx < nh and trg_idx < nf:
next_src_score = similarities[src_idx + 1][trg_idx] if src_idx + 1 < nh else 0
next_trg_score = similarities[src_idx][trg_idx + 1] if trg_idx + 1 < nf else 0
next_both_score = similarities[src_idx + 1][trg_idx + 1] if src_idx + 1 < nh and trg_idx + 1 < nf else 0
result_idxs.append((src_idx, trg_idx - nh))
if next_src_score > next_both_score:
src_idx += 1
elif next_trg_score > next_both_score:
trg_idx += 1
else:
src_idx += 1
trg_idx += 1
result = []
prev_src, prev_trg = -1, -1
for src, trg in result_idxs:
if src == prev_src:
result[-1][1] += f' {translated_sentences[trg]}'
elif trg == prev_trg:
result[-1][0] += f' {original_sentences[src]}'
else:
result.append([original_sentences[src], translated_sentences[trg]])
prev_src, prev_trg = src, trg
return zip(*result)
После обращения к модели мы получаем матрицу соответствий всех предложений, не только между языками, но и внутри них. Таким образом, нужные нам оценки соответствий будут находиться примерно во второй четверти, по которой пройдём «змейкой», собирая соседние предложения.
Так как мы рассматриваем только элементы [i+1, j]
, [i, j+1]
, [i+1, j+1]
, случаи, где в переводе кардинально меняется порядок предложений, могут привести к ошибочному выравниванию, однако игнорирование подобных редких случаев позволит нам избежать регулярных ложно-положительных сопоставлений из совершенно разных участков текста.
Возвращаясь к более общей задаче, стоит также позаботиться о передаче перевода из сессии в сессию, что особенно важно при обращениям к переводчику с системой лимитов, да и просто ускорит генерации при повторных вызовах. В общем виде наш класс с этими методами выглядит следующим образом:
class Translator:
def __init__(self, container) -> None:
self.text_processor = container.text_processor
match config.translation_provider:
case 'GoogleCloud':
provider = GCTranslateProvider
case 'Argos':
provider = ArgosTranslateProvider
self.provider = provider(config.source_lang, config.target_lang)
if config.use_translation_file == 1:
self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
@cache
def translate(self, text: str) -> str:
return self.provider.translate(text)
@staticmethod
def _get_stable_hash(text: str) -> str:
return hashlib.sha256(text.encode('utf-8')).hexdigest()
def process_sentences(self, original_sentences: list[str]) -> list[str]:
match config.use_translation_file:
case 0:
self.translated = {
self._get_stable_hash(src): self.translate(src.rstrip()) for src in original_sentences
}
case 1:
result_src, result_trg = self.get_literary_translation_from_file(original_sentences)
self.translated = {self._get_stable_hash(src): trg for src, trg in zip(result_src, result_trg)}
return result_src
case 2:
with open(config._root_dir / 'temp' / 'translation.pkl', 'rb') as file:
self.translated = pickle.load(file)
return original_sentences
def save_translation_to_file(self) -> None:
with open(config._root_dir / 'temp' / 'translation.pkl', 'wb') as file:
pickle.dump(self.translated, file)
def get_translated_sentence(self, sentence: str) -> str:
return self.translated[self._get_stable_hash(sentence)]
def get_literary_translation_from_file(self, original_sentences: list[str]) -> tuple[list[str], list[str]]:
…
Классы GCTranslateProvider
и ArgosTranslateProvider
просто композиты с реализованным методом перевода, ничего примечательного.
В process_sentences()
мы по значению из конфигурационного файла выбираем путь обработки – прямой перевод, переиспользование сохранённого ранее перевода, либо выравнивание с литературным.
Перевод выполняем сразу для всех предложений, сохраняя их по стабильному хэшу. В этом виде их и записываем в файловую систему, или загружаем сохранённые ранее.
Из основного метода также возвращаем список оригинальных предложений, так как они могли несколько измениться, в случае выравнивания с литературным переводом.
❯ Подготовка токенов и эмбеддингов
Теперь можем перейти к обработке. Для базовых операций возьмём spaCy, раздельно загрузим модели для каждого из языков (для примера английского и русского языков выберем en_core_web_sm
и ru_core_news_sm
). Выбор моделей, языков и ещё кучи всего оставляем на выбор пользователя через отдельный config
.
Забегая чуть вперёд, помимо самих токенов, которые мы получим от spacy, нам также понадобятся их эмбеддинги. С этим spacy нам не поможет, потому обратимся к transformers с моделью bert-base-multilingual-cased
. Так как токенизация bert подсловная, а мы хотим работать с целыми словами, нам нужно после получения эмбеддингов обратно собрать наши подслова в целые, собрав и значения эмбеддингов от каждой составляющей. Полученные эмбеддинги добавим к токенам spacy, для чего нам понадобится сопоставить токены полученные от разных токенизаторов:
def add_tokens_embeddings(self, sentence: str, spacy_tokens: list[Token]) -> None:
spacy_idx = {j: i for i, t in enumerate(spacy_tokens) for j in range(t.idx, t.idx + len(t.text))}
tokenized_input = self.embedding_tokenizer(
sentence, return_tensors='pt', padding=True, truncation=True, return_offsets_mapping=True
)
offsets = tokenized_input.pop('offset_mapping')[0]
with torch.no_grad():
model_output = self.embedding_model(**tokenized_input, output_hidden_states=True)
embeddings = torch.mean(torch.stack(model_output.hidden_states[-4:]), dim=0).squeeze(0)
bert_tokens = self.embedding_tokenizer.convert_ids_to_tokens(tokenized_input.input_ids[0])
aggregators = {…}
aggregator = aggregators[config.embedding_aggregator.lower()](embeddings, len(bert_tokens))
subword_count = 0
current_idx = 0
for i, token in enumerate(bert_tokens):
if offsets[i][0] == offsets[i][1]:
continue
start, end = spacy_idx[offsets[i][0].item()], spacy_idx[offsets[i][1].item() - 1]
if start != end:
raise RuntimeError(f'Intersect error. Bert "{token}", {offsets[i]} (spacy: {start}, {end})')
if start == current_idx:
aggregator.append(i)
subword_count += 1
continue
if subword_count:
spacy_tokens[current_idx]._.embedding = aggregator.get_final_embedding(subword_count)
aggregator.start_new_word(i)
subword_count = 1
current_idx = start
if subword_count:
spacy_tokens[current_idx]._.embedding = aggregator.get_final_embedding(subword_count)
Получаем эмбеддинги, проходим по всем токенам, собирая подслова воедино по позициям токенов spacy, присваиваем токенам новый атрибут. Общий вид механизма агрегации:
class BaseAggregator:
def __init__(self, embeddings, n):
self.embeddings = embeddings
self.n = n
self.current_embeddings = []
def start_new_word(self, idx):
self.current_embeddings = [self.embeddings[idx]]
def append(self, idx):
self.current_embeddings.append(self.embeddings[idx])
def get_final_embedding(self, subword_count):
return torch.mean(torch.stack(self.current_embeddings), dim=0)
Некоторые атрибуты пока не используются, но пригодятся дальше. Здесь у нас обычное усреднение, но это, конечно, не единственный подход. Также мы можем применить подходы min/max pooling, выбирая только минимальные или максимальные значения из подслов, можем добавить веса для токенов через механизм внимания с L2-нормализацией, или предобработку токенов через центрирование.
К сожалению, после сравнения результатов оказалось, что всё перечисленное не слишком разительно отличается от простого усреднения, которое и вовсе даёт лучший вывод.
Тем не менее, закрепим перечисленные подходы в реализации для дальнейшего испытания практикой:
class MaxPooling(BaseAggregator):
def get_final_embedding(self, _):
return torch.max(torch.stack(self.current_embeddings), dim=0).values
class MinPooling(BaseAggregator):
def get_final_embedding(self, _):
return torch.min(torch.stack(self.current_embeddings), dim=0).values
class Attention(BaseAggregator):
def __init__(self, embeddings, n):
super().__init__(embeddings, n)
self.attention_scores = torch.zeros(self.n, dtype=torch.float32)
def start_new_word(self, idx):
super().start_new_word(idx)
self.attention_scores = torch.zeros(self.n, dtype=torch.float32)
self.attention_scores[0] = torch.norm(self.embeddings[idx], p=2)
def append(self, idx):
super().append(idx)
attention_score = torch.matmul(self.embeddings[idx], torch.mean(torch.stack(self.current_embeddings), dim=0))
l2_norm = torch.norm(self.embeddings[idx], p=2)
self.attention_scores[len(self.current_embeddings) - 1] = attention_score * l2_norm
def get_final_embedding(self, subword_count):
if torch.sum(self.attention_scores) != 0:
attention_weights = torch.nn.functional.softmax(self.attention_scores[:subword_count], dim=0)
return torch.sum(attention_weights.unsqueeze(1) * torch.stack(self.current_embeddings), dim=0)
return super().get_final_embedding(subword_count)
class TextProcessing:
def add_tokens_embeddings(…):
…
embeddings = torch.mean(torch.stack(model_output.hidden_states[-4:]), dim=0).squeeze(0)
bert_tokens = self.embedding_tokenizer.convert_ids_to_tokens(tokenized_input.input_ids[0])
if config.embedding_preprocessing_center:
mean_embedding = torch.mean(embeddings, dim=0)
embeddings = embeddings - mean_embedding
aggregators = {
'averaging': Averaging,
'maxpooling': MaxPooling,
'minpooling': MinPooling,
'attention': Attention,
}
aggregator = aggregators[config.embedding_aggregator.lower()](embeddings, len(bert_tokens))
…
Есть ещё один важный нюанс – хоть токенизатор используемый в spacy не является подсловным, некоторые сложные слова он всё же разбивает на несколько токенов, и, что ещё осложняет дело, его разбиение отличается от bert. В английском языке это сразу же выдаст ошибку на всех словах с апострофами, варианте написания cannot, и ещё некоторых случаях. К тому же, для наших целей было бы неплохо объединить последовательности пунктуационных токенов и слова с написанием через дефис, которые мы хотим рассматривать как единую сущность. Для всех этих целей нам стоит немного вмешаться в процесс токенизации spacy:
def _merge_tokens(doc: Doc) -> list[Token]:
spans_to_merge = []
i = 0
n = len(doc)
while i < n:
token = doc[i]
cur_text = token.text.lower()
prev_text = doc[i - 1].text.lower() if i > 0 else ''
next_text = doc[i + 1].text.lower() if i < n - 1 else ''
if ({''', '’'} & {cur_text[0], cur_text[-1]}) and i > 0:
spans_to_merge.append(doc[i - 1 : i + 1])
elif i < n - 1 and (
(cur_text == 'can' and next_text == 'not')
or (token.is_punct and doc[i + 1].is_punct)
or (re.match(r'^p{Sc}$', cur_text) and re.match(r'^d{1,3}([., ]d{2,3})*$', next_text))
or (re.match(r'^p{L}+$', cur_text) and re.match(r'^n['’]?t$', next_text))
):
spans_to_merge.append(doc[i : i + 2])
elif 0 < i < n - 1 and re.match(r'^p{L}-p{L}$', f'{prev_text[-1]}{cur_text}{next_text[0]}'):
start = i - 1
end = i + 2
while end < n - 1 and re.match(r'^-p{L}$', f'{doc[end].text}{doc[end + 1][0]}'):
end += 2
spans_to_merge.append(doc[start:end])
logging.debug(f'Adding span to merge (complex hyphenated chain): {doc[start:end]}')
i = end - 1
i += 1
filtered_spans = spacy.util.filter_spans(spans_to_merge)
with doc.retokenize() as retokenizer:
for span in filtered_spans:
retokenizer.merge(span)
return [token for token in doc if not token.is_space]
Здесь, и далее, используется regex as re
, благодаря чему мы можем обращаться к категориям юникода, которые не поддерживаются родным re
стандартной библиотеки.
Обратим внимание на случаи, когда слова с написанием через дефис образуют целые последовательности, вроде out-of-the-way. Их мы собираем полностью, рассматривая и все последующие совпадающие токены, начиная от среагировавшего на начальную проверку.
❯ Структуры хранения
Теперь, когда мы почти готовы перейти к основной логике, пора описать структуры, в которых мы будем хранить наши слова. Их предполагается два типа – леммы и конкретные словоформы. Это даёт нам бо́льшую гибкость при переводе и уменьшает количество повторов: когда мы видим слово впервые – мы переводим его, но если словоформа уже встречалась (и достаточно давно, чтобы мы хотели добавить её в перевод), мы также смотрим на то, когда мы в последний раз встречали родительскую лемму, и по ней мы можем отбросить данное слово из перевода, если увидим, что недавно переводили смежную форму. Конечно и интервалы повторения для лемм и словоформ должны быть разные. Базовые представления для каждого типа одни:
class BaseNode:
def __init__(self, intervals: tuple[int]) -> None:
self.intervals = intervals
self.last_pos = 0
self.level = 0
def check_repeat(self, position: int) -> bool:
if not self.level:
return True
return self.intervals[self.level] < position - self.last_pos
def update(self, new_pos: int) -> None:
self.level += 1
self.last_pos = new_pos
Мы сохраняем для наших слов и лемм последнюю позицию, в которой мы их встречали, количество встреч, и ссылку на сам список пороговых интервалов между повторениями.
Интервалы, как и многое другое, настраиваются через конфигурационный файл. Пока они не имеют большого значения и тестово имеют вид:
lemma_intervals = tuple(b - a for a, b in pairwise(int(1.05**i) for i in range(190, 600))) entity_intervals = tuple(b - a for a, b in pairwise(int(1.1**i) for i in range(190, 600)))
Так как мы храним последнюю позицию и «уровень» закрепления, а не просто расстояние до следующего повторения, мы в любой момент можем изменить интервалы, и все наши дистанции повторения будут соответствовать новым реалиям, без каких-либо дополнительных действий по переназначению оных.
В дочернем от BaseNode
классе для словоформ нам нужно лишь задать соответствующие интервалы и добавить поле, в котором будем хранить перевод:
class Entity(BaseNode):
def __init__(self, translation: str) -> None:
super().__init__(config.entity_intervals)
self.translation = translation
Леммы у нас будут выступать первыми ключами хранения, и тут мы вспомним об омонимии и полисемии, из-за которых, при хранении по одной лемме, разные сущности могут быть записаны по одному пути. Наиболее оптимальным по сложности/качеству решением этой проблемы видится использование в качестве ключа комбинированной леммы – f'{лемма слова оригинала}-{лемма слова перевода}'
, с листами в таком же виде, только уже от конкретных форм:
class LemmaDict(BaseNode):
def __init__(self) -> None:
super().__init__(config.lemma_intervals)
self.children: dict[str, LemmaDict | Entity] = {}
def add(self, lemma_src: str, text_src: str, lemma_trg: str, text_trg: str) -> tuple['LemmaDict', Entity]:
lemma_key = f'{lemma_src}-{lemma_trg}'
if lemma_key not in self.children:
self.children[lemma_key] = LemmaDict()
lemma_obj = self.children[lemma_key]
ent_key = f'{text_src}-{text_trg}'
if ent_key not in lemma_obj.children:
lemma_obj.children[ent_key] = Entity(text_trg)
return lemma_obj, lemma_obj.children[ent_key]
Здесь мы напрямую наследуемся от BaseNode
, и возвращаем из add()
не только лист, но и его «родительскую» лемму, что позволит нам в дальнейшем проверять дистанцию для каждого из них.
Давайте ещё раз заглянем немного вперёд: в тексте ведь нам постоянно будут попадаться устойчивые выражения, именованные сущности и прочие последовательности слов, которые мы не хотим переводить пословно. Конечно рассказ о музыке пещерного Ника и пожелания сломать ногу придадут комизма выходному тексту, но это не совсем то, что нам надо. Мы можем сохранять наши сущности в LemmaDict
по целой последовательности, в виде одной строки, но логичнее будет всё же сохранять всю цепочку, которая, в виде отдельных лемм, будет так же устойчива к небольшим изменениям форм. Реализовать это лучше всего будет вариацией префиксного дерева:
class LemmaTrie:
def __init__(self) -> None:
self.children: dict[str, LemmaTrie | Entity] = {}
def search(self, tokens: list['Token'], depth: int = 0, punct_tail: int = 0) -> tuple[Entity | None, int]:
if not tokens:
return None, 0
token = tokens.pop(0)
if not re.match(config.word_pattern, token.text):
return self.search(tokens, depth + 1, punct_tail + 1)
if child := self.children.get(token.lemma_):
child = child.search(tokens, depth + 1)
return child if child and child[0] else (self.children.get('#'), depth - punct_tail)
def add(self, lemmas: list[str], entity: Entity) -> Entity:
if not lemmas:
if '#' not in self.children:
self.children['#'] = entity
return self.children['#']
if lemmas[0] not in self.children:
self.children[lemmas[0]] = LemmaTrie()
return self.children[lemmas[0]].add(lemmas[1:], entity)
Поиск принимает список слов (в виде токенов), и рекурсивно ищет глубочайший лист по леммам, сохраняя глубину нахождения. Так как токены могут содержать и пунктуацию, которую мы не можем просто отбросить из общего списка, для обратной совместимости текста, мы просто пропускаем их, запоминая «пунктуационный хвост», который вычтем из счётчика глубины для найденного листа.
Добавление нового листа проходит весь путь по леммам, добавляя, если их ещё не существует на каком-то уровне, и в конце пытается назначить переданный Entity
листом дерева. Если лист по этому пути уже существует, мы его не перезаписываем, иначе потеряем «прогресс» этой сущности. Возвращаем лист, будь то новый, или существовавший до вызова метода, по которому будем проверять дистанцию повторения. Дистанция леммы в этом многословном варианте не учитывается. При вероятном конфликте переводов у существующей и переданной сущности отдаём предпочтение существующей.
❯ Основная логика
К текущему моменту мы получаем список предложений, проводим их через переводчик и получаем токены предложений для каждого из языков с дополнительными эмбеддингами. Токены предложений на двух языках инкапсулируем в класс Sentence
, где и будет наша основная логика.
В центральном методе итерируемся по токенам оригинального предложения проверяя их на условия до первого совпадения. Первым делом учтём условия пропуска токенов – если этот токен уже обработан из грядущих методов; если это токен пунктуации; и если NER spacy отметил токен как часть сущности, которую мы считаем непереводимой, например PERSON
и DATE
. Категории непереводимых сущностей оставим к редактированию в конфигурационном файле. Вдобавок к этому пропускаем артикли, которые также определяются spacy по значению Art
в token.morph.get('PronType')
. Здесь можно было применить проверку token.pos_ == 'DET'
, но она, помимо артиклей, также сработает на некоторые количественные слова, а также на указательные и притяжательные местоимения.
def process_tokens(self) -> None:
for idx_src, token_src in enumerate(self.tokens_src):
self.entity_counter += 1
logging.debug(f'Processing token: {token_src}')
if token_src in self.skip:
logging.debug('Skipping previously processed token')
elif not re.match(config.word_pattern, token_src.text):
self.entity_counter -= 1
logging.debug('Skipping punctuation')
elif token_src.ent_type_ in config.untranslatable_entities or 'Art' in token_src.morph.get('PronType'):
logging.debug(f'Skipping untranslatable entity or article: {token_src.ent_type_} – {token_src.morph.get("PronType")}')
elif …
И наконец приступаем к непосредственной обработке. Сначала проверим, существует ли в нашем lemma_trie
последовательность начинающаяся от текущего токена. Если последовательность обнаружилась, добавляем её к выводу после проверки дистанции повторения:
def process_tokens(self) -> None:
for idx_src, token_src in enumerate(self.tokens_src):
…
elif self.trie_search_and_process(idx_src):
logging.debug('Found multiword chain')
def trie_search_and_process(self, idx_src: int) -> bool:
entity, depth = self.container.lemma_trie.search(self.tokens_src[idx_src:])
if depth > 1:
self.treat_trie_entity(entity, self.tokens_src[idx_src : idx_src + depth])
return True
return False
def treat_trie_entity(self, entity: Entity, tokens_src: list['Token'], tokens_trg=None) -> None:
if entity.check_repeat(self.entity_counter):
entity.update(self.entity_counter)
self.result.append((' '.join(token.text.lower() for token in tokens_src), entity.translation))
self.skip |= set(tokens_src)
Вне зависимости от того, требуется ли закрепить переводом найденную последовательность в выводе, добавляем эти токены к пропуску на следующих итерациях, чтобы не перевести часть сущности как дополнительное отдельное слово.
Если последовательность не нашлась, переходим далее. Следующим пунктом необходимо проверить, не является ли токен началом именованной сущности. К нашей радости, и это spacy уже распознал, и нам остаётся лишь проверить значение ent_iob_
текущего и следующего токенов. Если текущий помечен как B
, а следующий I
, можем смело добавлять новую сущность:
def process_tokens(self) -> None:
for idx_src, token_src in enumerate(self.tokens_src):
…
elif self._is_start_of_named_entity(idx_src):
self.add_named_entity_to_trie(idx_src)
def _is_start_of_named_entity(self, idx_src: int) -> bool:
return (
self.tokens_src[idx_src].ent_iob_ == 'B'
and len(self.tokens_src) > idx_src + 1
and self.tokens_src[idx_src + 1].ent_iob_ == 'I'
)
def add_named_entity_to_trie(self, idx_src: int) -> None:
seq_tokens = [self.tokens_src[idx_src]]
for forw_token in self.tokens_src[idx_src + 1 :]:
if forw_token.ent_iob_ != 'I':
break
seq_tokens.append(forw_token)
translation = self.container.translator.translate(' '.join(token.text for token in seq_tokens))
entity = self.container.lemma_trie.add([token.lemma_ for token in seq_tokens], Entity(translation))
self.treat_trie_entity(entity, seq_tokens)
Собираем всю именованную последовательность и переводим её отдельным вызовом, что более надёжно, чем искать выравнивания в целых предложениях. В конце добавляем последовательность в нашу структуру и сохраняем результат в текущей генерации.
Следующей проверкой будет вхождение в список стоп-слов, который мы можем взять у nltk.corpus.stopwords(lang)
. Это условие пропуска токена, но мы выполняем эту проверку только сейчас, так как стоп-слово может быть началом фразеологизма, присутствующего в LemmaTrie
. Потому сначала проверки мультисловного вхождения, потом пропуск по стоп-слову.
Здесь немного подвешен вопрос с артиклями, ведь мы их пропускаем в начале, и можем упустить случаи, когда они могли быть распознаны как часть многословной последовательности, и, в некоторых случаях, иначе переведены, но LemmaTrie
слишком замусоривается утроением a, the, _, и это не говоря о языках с куда бо́льшим количеством артиклей.
И если мы дошли до этого шага, не прервавшись ни на одной из прошлых проверок, значит пора приступить к задаче выравнивания отдельных слов, которую вынесем в отдельный модуль. Здесь мы хотим получить соответствующие друг другу последовательности токенов оригинала и перевода, с оценкой уверенности этого соответствия. Для первоначального выравнивания воспользуемся simalign. Данный инструмент принимает на вход слова из двух предложений и возвращает список пар индексов, которые он распознал как соответствующие. Получим эти пары и переведём их в перекрёстный словарь:
def _align_tokens(self) -> tuple[dict[int, list[int]], dict[int, list[int]]]:
# self.aligner = SentenceAligner(model='xlm-roberta-base', token_type='bpe', matching_methods=config.alignment_matching_method)
align = self.aligner.get_word_aligns([t.text for t in self.tokens_src], [t.text for t in self.tokens_trg])
src_to_trg = defaultdict(list)
trg_to_src = defaultdict(list)
for a, b in align[config.alignment_matching_method]:
src_to_trg[a].append(b)
trg_to_src[b].append(a)
return src_to_trg, trg_to_src
Шаг с переводом выравниваний из обычных пар нам необходим потому, что наши токены далеко не всегда будут выровнены один-к-одному, и нам необходимо проработать каждый из случаев, включая тот, где для токена вовсе не нашлось выравнивания.
Мы можем конфигурировать сам метод выравнивания, передаваемый в simalign, выбирая более предпочтительный. Несмотря на это, для улучшения результатов нам стоит самостоятельно перепроверять и дорабатывать полученные результаты на основании собственных эмбеддингов.
После получения словаря соответствий создадим делегирующий метод обработки передаваемых токенов по типу их выравнивания:
def process_alignment(self, idx_src: int) -> tuple[float, list[Token], list[Token]]:
if idx_src not in self.src_to_trg:
return self.treat_not_aligned_token(idx_src)
to_trg_len = len(self.src_to_trg[idx_src])
to_src_len = len(self.trg_to_src[self.src_to_trg[idx_src][0]])
if to_trg_len == 1 and to_src_len == 1:
return self.one_to_one(idx_src)
if to_trg_len == 1 and to_src_len > 1:
return self.many_to_one(idx_src)
if to_trg_len > 1 and all(len(self.trg_to_src[x]) == 1 for x in self.src_to_trg[idx_src]):
return self.one_to_many(idx_src)
return self.many_to_many(idx_src)
В первом случае мы сразу переходим к ручному выравниванию, и здесь допустим только выравнивание один-к-одному, так как более широкий поиск будет чаще приводить к ошибочному сопоставлению. Лучше один раз пропустить слово, чем выдать неверный результат.
def treat_not_aligned_token(self, idx_src: int) -> tuple[float, list[Token], list[Token]]:
unaligned_trg_tokens = self._filter_aligned(self.tokens_trg, self.trg_to_src.keys(), reverse=True)
best_match_idx, best_score = self._find_best_match(unaligned_trg_tokens, self.tokens_src[idx_src])
return best_score, [self.tokens_src[idx_src]], [self.tokens_trg[best_match_idx]]
Вспомогательные методы _filter_aligned
и _find_best_match
пригодятся нам ещё не раз. _filter_aligned
с флагом reverse
вернёт словарь токенов по их индексам из предложения перевода, у которых не нашлось ни одной пары в выравнивании от simalign. Для неопределённых токенов поиск только среди таких же неопределённых.
def _filter_aligned(tokens: list[Token], alignment: list[int] | tuple[int], reverse=False) -> dict[int, Token]:
alignment = set(alignment)
if reverse:
return {idx: token for idx, token in enumerate(tokens) if idx not in alignment or token.is_punct}
return {idx: token for idx, token in enumerate(tokens) if idx in alignment or token.is_punct}
Также мы добавляем в вывод пунктуационные токены, для более простой работы в одном из следующих методов.
В _find_best_match
перебираем токены из полученного набора, игнорируя пунктуационные, и по косинусному расстоянию определяем оценку соответствия.
def _find_best_match(self, checkable_tokens: dict[int, Token], control_token: Token) -> tuple[int | None, float]:
best_match_idx, best_score = None, float('-inf')
for idx, token in checkable_tokens.items():
if token.is_punct:
continue
score = self._cosine_similarity(token._.embedding, control_token._.embedding)
if score > best_score:
best_score = score
best_match_idx = idx
return best_match_idx, best_score
По этой оценке в Sentence
будем принимать или отклонять токены для нашего финального результата. А пока перейдём к случаям, где simalign нашёл выравнивания.
Простейший для нас метод – один-к-одному:
def one_to_one(self, idx_src: int) -> tuple[float, list[Token], list[Token]]:
idx_trg = self.src_to_trg[idx_src][0]
if self.tokens_trg[idx_trg].is_punct:
return float('-inf'), [], []
score = self._cosine_similarity(self.tokens_src[idx_src]._.embedding, self.tokens_trg[idx_trg]._.embedding)
return score, [self.tokens_src[idx_src]], [self.tokens_trg[idx_trg]]
Проверяем отсутствие ложного выравнивания к пунктуационному токену, к сожалению и такое может быть, после чего просто возвращаем рассматриваемые токены с косинусным расстоянием их эмбеддингов.
В случаях с множественным соответствием с одной из сторон, нам необходимо добавить ещё одно упрощение, чтобы уменьшить количество ошибочных соответствий, а именно – мы не будем рассматривать токены, которые не образуют единую последовательность в изначальном предложении. Подобное не ложное выравнивание – крайне редкий случай, и пренебречь им будет даже правильно. Для этой фильтрации определим ещё один вспомогательный метод, который будет пытаться расширять последовательность в две стороны от опорного токена:
def _get_token_sequence(tokens: dict[int, Token], idx: int) -> list[Token]:
if not tokens:
return []
seq_tokens = [tokens[idx]]
for i in range(idx - 1, -1, -1):
if i not in tokens:
break
if tokens[i].is_punct:
continue
seq_tokens.insert(0, tokens[i])
for i in range(idx + 1, int(max(tokens.keys())) + 1):
if i not in tokens:
break
if tokens[i].is_punct:
continue
seq_tokens.append(tokens[i])
return seq_tokens
Пунктуационные символы не прерывают последовательность, для чего мы их и сохраняли в _filter_aligned()
, так как этот случай истинного выравнивания более реальный, хотя данный пункт и дискуссионный. Теперь можем реализовать новые методы:
def one_to_many(self, idx_src: int) -> tuple[float, list[Token], list[Token]]:
checkable_tokens = self._filter_aligned(self.tokens_trg, self.src_to_trg[idx_src])
best_match_idx, best_score = self._find_best_match(checkable_tokens, self.tokens_src[idx_src])
if best_match_idx is None:
return float('-inf'), [], []
seq_tokens_trg = self._get_token_sequence(checkable_tokens, best_match_idx)
return best_score, [self.tokens_src[idx_src]], seq_tokens_trg
def many_to_one(self, idx_src: int) -> tuple[float, list[Token], list[Token]]:
idx_trg = self.src_to_trg[idx_src][0]
checkable_tokens = self._filter_aligned(self.tokens_src, self.trg_to_src[idx_trg])
best_match_idx, best_score = self._find_best_match(checkable_tokens, self.tokens_trg[idx_trg])
if best_match_idx is None:
return float('-inf'), [], []
seq_tokens_src = self._get_token_sequence(checkable_tokens, best_match_idx)
return best_score, seq_tokens_src, [self.tokens_trg[idx_trg]]
Оставляем только те токены, к которым нашлось выравнивание, находим лучшее соответствие, от него пытаемся выстроить последовательность в две стороны по тем же выровненным токенам.
И последний случай – многие-ко-многим. Здесь общая логика будет той же, но с парой дополнений. Первым делом мы двойным перебором находим сильнейшую пару из всех соответствий, после чего фильтруем и строим последовательность для каждого из языков. В процессе мы также запоминаем все рассмотренные пары, чтобы при вызове из следующих токенов мы не добавили новый результат, который включает в себя уже рассмотренный.
def many_to_many(self, idx_src: int) -> tuple[float, list[Token], list[Token]]:
best_src, best_trg, best_score = None, None, float('-inf')
for idx_trg in self.src_to_trg[idx_src]:
src_by_trg = tuple(self.trg_to_src[idx_trg])
if (idx_trg, src_by_trg) in self.seen:
continue
self.seen.add((idx_trg, src_by_trg))
checkable_tokens = self._filter_aligned(self.tokens_src, src_by_trg)
curr_match, curr_score = self._find_best_match(checkable_tokens, self.tokens_trg[idx_trg])
if curr_score > best_score:
best_src = curr_match
best_trg = idx_trg
best_score = curr_score
if best_src is None:
return float('-inf'), [], []
checkable_tokens_src = self._filter_aligned(self.tokens_src, self.trg_to_src[best_trg])
checkable_tokens_trg = self._filter_aligned(self.tokens_trg, self.src_to_trg[best_src])
seq_tokens_src = self._get_token_sequence(checkable_tokens_src, best_src)
seq_tokens_trg = self._get_token_sequence(checkable_tokens_trg, best_trg)
return best_score, seq_tokens_src, seq_tokens_trg
Рассмотрев все случаи мы можем завершить обработку в Sentence
, где уже выполнили ряд проверок и дошли до получения лучшего выравнивания от текущего токена. После получения списков токенов и оценки их соответствия, уберём начальные артикли по уже знакомой проверке 'Art' in token.morph.get('PronType')
из обоих списков, куда они могли пролезть через расширение последовательности от других токенов, после этого проверим проходит ли лучшая оценка минимальный порог (конфигурируется пользователем), и по количеству оставшихся токенов языка оригинала добавим результат в LemmaTrie
, если len > 1
, или в LemmaDict
, сохранив его и в текущий вывод.
for idx_src, token_src in enumerate(self.tokens_src):
if …
…
score, seq_tokens_src, seq_tokens_trg = self.aligner.process_alignment(idx_src)
seq_tokens_src = list(dropwhile(lambda t: 'Art' in t.morph.get('PronType'), seq_tokens_src))
seq_tokens_trg = list(dropwhile(lambda t: 'Art' in t.morph.get('PronType'), seq_tokens_trg))
if not seq_tokens_src or not seq_tokens_trg:
continue
if score < config.min_align_weight:
self.possible_result.append((round(score, 2), seq_tokens_src, seq_tokens_trg))
logging.debug(f'Rejected after alignment: {score}, {seq_tokens_src}, {seq_tokens_trg}')
continue
logging.debug(f'Approved after alignment: {score}, {seq_tokens_src}, {seq_tokens_trg}')
if len(seq_tokens_src) == 1:
self.treat_dict_entity(seq_tokens_src, seq_tokens_trg)
else:
translation = ' '.join(token.text for token in seq_tokens_trg)
entity = self.container.lemma_trie.add(
[token.lemma_ for token in seq_tokens_src], Entity(translation)
)
self.treat_trie_entity(entity, seq_tokens_src, seq_tokens_trg)
Осталось лишь вернуть полученный результат из Sentence
в основной цикл обработки и генерации. Пока возвращать будем в виде списков, с пометками, что они из себя представляют, что поможет нам на этапе синтеза. Нам важно различать непосредственно сами предложения и переведённый словарь.
Переведённое предложение добавим в вывод только в том случае, если новых или необходимых к закреплению слов больше пяти, либо больше четверти от длины всех токенов предложения, но не меньше двух. Чтобы улучшить текстовый вывод, также отдельно отбросим от оригинального предложения возможный хвост переносов строк, и вернём его после словаря и возможного перевода.
def get_results(self) -> list[tuple[int, str | list[tuple[str, str]]]]:
stripped = self.sentence.rstrip()
tail = self.sentence[len(stripped) :]
result = [(0, stripped)]
if self.result:
result.append((2, self.result))
if self._translation_line_condition:
result.append((1, self.translated_sentence))
if tail:
result.append((3, tail))
return result
@property
def _translation_line_condition(self) -> bool:
n = len(self.result)
quarter = len(self.tokens_src) // 4
return n > 5 or 2 < n > quarter
В конце основного цикла обработки увеличим общие счётчики токенов и предложений, которые будут сохраняться от сессии к сессии, для правильного подсчёта дистанции повторения.
self.container.entity_counter = sentence_obj.entity_counter
self.container.sentence_counter += 1
self.output_text.extend(sentence_obj.get_results())
❯ Синтез речи
Осталась задача синтеза книги с нашими переводами. Реализуем её выполнение с помощью различных синтезаторов, также на выбор пользователя.
Простейший из известных синтезаторов – gTTS. Данная библиотека является обёрткой для обращения к открытой и бесплатной части API Google Translate. Ничего не конфигурируется, никаких явных лимитов, на выходе получаем .mp3.
class GTTSProvider:
def __init__(self) -> None:
self.model = gTTS
def synthesize(self, text: str, lang: str) -> AudioSegment:
audio_buffer = BytesIO()
tts = self.model(text=text, lang=lang)
tts.write_to_fp(audio_buffer)
audio_buffer.seek(0)
return AudioSegment.from_mp3(audio_buffer)
Среди локальных инструментов лучшим из опробованных стал CoquiTTS. Данная библиотека обладает действительно широким функционалом, но нас сегодня интересует только прямой синтез. Для этого нам необходимо лишь выбрать нужную мультиязычную модель, например tts_models/multilingual/multi-dataset/xtts_v2
, и голос озвучки. Код также минимальный, нам нужны только основные методы библиотеки:
class CoquiTTSProvider:
def __init__(self) -> None:
self.model = TTS(model_name=config.synth_model)
self.voice = config.voice_src
def synthesize(self, text: str, lang: str) -> AudioSegment:
audio_buffer = BytesIO()
self.model.tts_to_file(text=text, file_path=audio_buffer, speaker=self.voice, language=lang)
audio_buffer.seek(0)
return AudioSegment.from_wav(audio_buffer)
Третьим же рассмотренным вариантом будет Google Cloud, к которому мы уже обращались при создании перевода. Здесь у нас куда больше возможностей к прямому конфигурированию, внушительный список моделей и голосов и ещё пара приятных преимуществ. Конечно это потенциально платный инструмент, но существующего бесплатного лимита на always free tier для личных нужд хватит с головой – каждый месяц 4 миллиона входных символов для моделей стандартного синтеза, и по 1 миллиону для генераций с использованием WaveNet, Neural2 и Journey моделей [считаются раздельно, и в сумме это 7М символов?]. Для работы с этим API, нужно будет создать GC проект и настроить доступы, но это вне рамок статьи. В рамках – метод получения синтезированного текста:
class GoogleCloudTTSProvider:
def __init__(self) -> None:
self.client = texttospeech.TextToSpeechClient()
def synthesize(self, text: str, lang: str, speed: float) -> AudioSegment:
input_text = texttospeech.SynthesisInput(text=text)
audio_config = texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.LINEAR16, speaking_rate=speed
)
voice_name = config.voice_src if lang == config.source_lang else config.voice_trg
lang = voice_name[:5]
voice = texttospeech.VoiceSelectionParams(language_code=lang, name=voice_name)
response = self.client_short.synthesize_speech(input=input_text, voice=voice, audio_config=audio_config)
audio_buffer = BytesIO()
audio_buffer.write(response.audio_content)
audio_buffer.seek(0)
return AudioSegment.from_wav(audio_buffer)
Не очень удобной особенностью является то, что подобная генерация возможна только для текстов <5К байт. Для более объёмных текстов результат синтеза будет сохранён в Google Cloud Storage, откуда нам придётся его дополнительно вытаскивать. Адаптируем код под эти реалии:
def __init__(self) -> None:
self.client_short = texttospeech.TextToSpeechClient()
self.client_long = texttospeech.TextToSpeechLongAudioSynthesizeClient()
self.storage_client = storage.Client()
def synthesize(self, text: str, lang: str, speed: float) -> AudioSegment:
…
if len(text.encode('utf-8')) > 4990:
logging.debug('GC TTS: Long audio synthesis')
return self._synthesize_long(input_text, voice, audio_config)
return self._synthesize_short(input_text, voice, audio_config)
def _synthesize_short(self, input, voice, audio_config) -> AudioSegment:
response = self.client_short.synthesize_speech(input=input, voice=voice, audio_config=audio_config)
audio_buffer = BytesIO()
audio_buffer.write(response.audio_content)
audio_buffer.seek(0)
return AudioSegment.from_wav(audio_buffer)
def _synthesize_long(self, input, voice, audio_config) -> AudioSegment:
bucket = self.storage_client.bucket('name')
blob = bucket.blob('audio_output.wav')
if blob.exists():
blob.delete()
parent = f'projects/{config.google_cloud_project_id}/locations/{config.google_cloud_project_location}'
output_gcs_uri = 'gs://name/audio_output.wav'
request = texttospeech.SynthesizeLongAudioRequest(
input=input, voice=voice, audio_config=audio_config, parent=parent, output_gcs_uri=output_gcs_uri
)
operation = self.client_long.synthesize_long_audio(request=request)
result = operation.result(timeout=300)
audio_buffer = BytesIO()
blob.download_to_file(audio_buffer)
audio_buffer.seek(0)
return AudioSegment.from_wav(audio_buffer)
И здесь ещё раз, в последний раз, нужно вспомнить о месячных ограничениях: бесплатный лимит хранения в 5Гб для GCS действует только на хранилища в регионах US-WEST1
, US-CENTRAL1
, и US-EAST1
. Бесплатный лимит передачи данных – 100Гб. Для личных нужд, опять же, более чем достаточно.
Отдельные синтезаторы определили, теперь подвяжем их к нашей задаче. Основных методов синтеза у нас два: простой – что пришло, то и передали дальше; и синтез с учётом типа текста – по интам, которые мы отдали из Sentence.get_results()
:
class SpeechSynthesizer:
def __init__(self) -> None:
if not config.speech_synth:
return
match config.synth_provider:
case 'CoquiTTS':
self.model = CoquiTTSProvider()
case 'gTTS':
self.model = GTTSProvider()
case 'GoogleCloud':
self.model = GoogleCloudTTSProvider()
case _:
raise ValueError(f'Unknown synth_provider value ({config.synth_provider}).')
def synthesize(self, text: str, lang: str, speed: float = 1.0) -> AudioSegment:
if text:
text = re.sub(r'^P{L}+|[P{L}?!.]+$', '', text)
if not text:
return False
return self.model.synthesize(text, lang, speed)
def synthesize_by_parts(self, parts: list[tuple[int, str | list[tuple[str, str]]]], speed: float) -> AudioSegment:
audio_buffer = self.silent(0)
for flag, value in parts:
match flag:
case 0:
audio = self.synthesize(value, config.source_lang, speed)
case 1:
audio = self.synthesize(value, config.target_lang, speed)
case 2:
audio = self.synthesize_by_parts(
[(i, v) for val in value for i, v in enumerate(val)], config.vocabulary_pronunciation_speed
)
case _:
continue
if audio:
audio_buffer += audio
audio = None
return audio_buffer
@staticmethod
def save_audio(audio: AudioSegment, name: str) -> None:
audio.export(config._root_dir / f'{name}.wav', format='wav')
Входной текст на «простом» синтезе мы очищаем от незначащих начальных и конечных символов, чтобы снизить объём обращения к конкретному синтезатору. Для работы с аудио используем AudioSegment
из pydub.
На вход в синтез по частям, после всех Sentence.get_results()
, нам приходят оригинальные предложения как 0
, переведённые предложения как 1
, и словарь, под цифрой 2
. Последний мы преобразуем в новый список из пронумерованных нулями и единицами слов или единых выражений, и обрабатываем их через повторный вызов этого же метода.
Также предусмотрим раздельную адаптацию скорости речи для предложений и словаря. Из рассмотренных синтезаторов генерацию с пользовательской скоростью поддерживает лишь GC, потому для gTTS и CoquiTTS нужно добавить декоратор с вызовом FFmpeg (допустимые пределы скорости 0.5-2):
def adjust_audio_speed(func):
@wraps(func)
def wrapper(self, text: str, lang: str, speed: float) -> AudioSegment:
audio: AudioSegment = func(self, text, lang)
if speed != 1:
adjusted_audio = BytesIO()
audio.export(adjusted_audio, format='wav', parameters=['-filter:a', f'atempo={speed}'])
adjusted_audio.seek(0)
audio = AudioSegment.from_wav(adjusted_audio)
return audio
return wrapper
class GTTSProvider:
@adjust_audio_speed
def synthesize(…):
…
class CoquiTTSProvider:
@adjust_audio_speed
def synthesize(…):
…
И раз уж мы работаем с Google Cloud TTS, стоит также воспользоваться ещё одним его преимуществом – поддержкой формата SSML. Это язык разметки текста для синтеза речи включающий полтора десятка тегов.
Нам пригодятся самые базовые структурные (абзац, предложение и паузация), а также конфигурирующие звучание теги, такие как emphasis
придающий акцентуацию определённым участкам текста; prosody
, для точной настройки высоты, скорости и громкости звучания; а также voice
, для смены спикера во время синтеза, без отдельных обращений, что для нашей задачи мультиязычного синтеза весьма кстати. Нам не нужно маркировать каждый участок текста конкретным диктором, мы всё так же передаём основного, который используется для озвучки, но для отмеченных voice
участков будет применяться заданный тегом. Стандарт несколько шире, и там есть ещё весьма занятные пункты.
Что же, создадим с помощью Jinja2 общий шаблон выходного ssml для каждого обработанного предложения:
<p>
<s>
{% if sentence_speed != 1 %}
<prosody rate="{{ (sentence_speed * 100) | int }}%">{{ sentence }}</prosody>
{% else %}
{{ sentence }}
{% endif %}
</s>
{% if result %}
<break strength="strong"/>
<s>
<emphasis level="moderate">
<prosody {% if vocabulary_speed != 1 %}rate="{{ (vocabulary_speed * 100) | int }}%" {% endif %}pitch="+10%" volume="-5dB">
{% for src, trg in result %}
{{ src }}-<voice name="{{ voice_trg }}">{{ trg }}</voice>
{% if not loop.last %}<break strength="medium"/>{% endif %}
{% endfor %}
</prosody>
</emphasis>
</s>
{% endif %}
{% if translated_sentence %}
<break strength="strong"/>
<s>
<voice name="{{ voice_trg }}">
{% if sentence_speed != 1 %}
<prosody rate="{{ (sentence_speed * 100) | int }}%">{{ translated_sentence }}</prosody>
{% else %}
{{ translated_sentence }}
{% endif %}
</voice>
</s>
{% endif %}
</p>
<break strength="x-strong"/>
Шаблон конечно минифицируем, чтобы уменьшить объём обращений к облачному синтезатору. Теперь добавим обработку ssml на всём пути:
#main.py
class Main:
def __init__(self) -> None:
…
self.output_ssml: list[str] = ['<speak>']
…
def process(self, text: str) -> None:
…
for sentence_text in sentences:
…
self.output_text.extend(sentence_obj.get_results())
self.output_ssml.append(sentence_obj.get_rendered_ssml())
self.output_ssml.append('</speak>')
if config.speech_synth:
if config.use_ssml:
self.output_audio = self.container.synthesizer.synthesize(
''.join(self.output_ssml), config.source_lang, 1
)
else:
self.output_audio = self.container.synthesizer.synthesize_by_parts(
self.output_text, config.sentence_pronunciation_speed
)
self.container.synthesizer.save_audio(self.output_audio, 'multilingual_output')
#sentence_processing.py
from jinja2 import Template
class Sentence
def __init__(…):
…
with open(config._root_dir / 'src' / 'templates' / 'template.min.ssml', 'r', encoding='utf-8') as file:
self.template = Template(file.read())
def get_rendered_ssml(self) -> str:
if not config.use_ssml:
return ''
translated = self.translated_sentence.rstrip() if self._translation_line_condition else ''
return self.template.render(
sentence=self.sentence.rstrip(),
translated_sentence=translated,
result=self.result,
voice_trg=config.voice_trg,
sentence_speed=config.sentence_pronunciation_speed,
vocabulary_speed=config.vocabulary_pronunciation_speed,
)
#synthesis.py
class GoogleCloudTTSProvider:
def synthesize(self, text: str, lang: str, speed: float) -> AudioSegment:
if config.use_ssml:
input_text = texttospeech.SynthesisInput(ssml=text)
audio_config = texttospeech.AudioConfig(audio_encoding=texttospeech.AudioEncoding.LINEAR16)
voice_name = config.voice_src
else:
input_text = texttospeech.SynthesisInput(text=text)
audio_config = texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.LINEAR16, speaking_rate=speed
)
voice_name = config.voice_src if lang == config.source_lang else config.voice_trg
…
И прежде чем перейти к завершению, можно попытаться реализовать ещё одно улучшение.
❯ Переиспользование звучания
При синтезе словаря мы передаём к озвучке отдельные слова, что, в отрыве от контекста, значительно меняет произношение, а в случае с омографами и вовсе порождает дополнительные ошибки. К тому же, синтез отдельных слов значительно хуже по качеству, нежели полноценных предложений, или, хотя бы, словосочетаний.
Если бы мы могли переиспользовать слова из уже сгенерированного целого предложения, то это бы отразилось и на качестве озвучки словаря, и уменьшило бы количество обращений к синтезатору. Потому давайте опишем ещё один путь, на котором мы воспользуемся MFA. Данный инструмент поможет выровнять промежутки из синтезированного предложения к словам из переданного текста, что позволит нам присвоить каждому токену его звучание ещё на этапе обработки токенов.
До этого мы формировали текстовое представление нашего выходного текста, и только после обращались к синтезатору. Использование MFA меняет этот путь, так как на этапе формирования результата нам необходимо уже иметь определённое звучание токенов, потому передавать предложения на озвучку мы будем в начале обработки. Это также значит, что на этом пути мы не будем использовать ssml и вновь будем компоновать выходное аудио вручную.
У MFA нет официальной python библиотеки, потому взаимодействовать будем через subprocess
и временные файлы. Также предварительно понадобится загрузить модели и словари для заданных языков и указать к ним путь в файле конфигурации.
def _align_audio(text: str, audio_buffer: 'AudioSegment', lang: str) -> str:
temp = config._root_dir / 'temp'
audio_buffer.export(temp / 'temp.wav', format='wav')
with open(temp / 'temp.txt', 'w', encoding='utf-8') as f:
f.write(text)
dict_path = Path(config.mfa_dir) / 'dictionary' / f'{lang}_mfa.dict'
model_path = Path(config.mfa_dir) / 'acoustic' / f'{lang}_mfa.zip'
command = ['mfa', 'align', '--clean', '--single_speaker', str(temp), dict_path, model_path, str(temp)]
subprocess.run(command, check=True)
На выходе мы получим .TextGrid файл с интервалами слов и отдельных фонем. Через tgt переберём все слова и вернём их список с найденными фрагментами аудио, добавив конфигурируемое смещение:
def _split_audio_by_alignment(audio: 'AudioSegment') -> list[dict]:
textgrid = tgt.io.read_textgrid(config._root_dir / 'temp' / 'temp.TextGrid')
segments = []
word_tier = textgrid.get_tier_by_name('words')
for interval in word_tier.intervals:
if interval.text.strip():
start_time = max(0, interval.start_time * 1000 - config.mfa_start_shift_ms)
end_time = min(len(audio), interval.end_time * 1000 + config.mfa_end_shift_ms)
segments.append({'text': interval.text, 'audio': audio[start_time:end_time]})
return segments
Теперь можем присвоить эти фрагменты конкретным токенам, конкатенируя звучания для агрегированных:
def _process_mfa_alignment(self) -> None:
self._align_audio(self.parent.sentence, self.sentence_audio, config.source_full_lang)
segments_src = self._split_audio_by_alignment(self.sentence_audio)
self._align_audio(self.parent.translated_sentence, self.translated_audio, config.target_full_lang)
segments_trg = self._split_audio_by_alignment(self.translated_audio)
for segments, tokens in ((segments_src, self.tokens_src), (segments_trg, self.tokens_trg)):
i, j = 0, 0
while i < len(segments) and j < len(tokens):
if segments[i]['text'] in tokens[j].text.lower():
tokens[j]._.audio += segments[i]['audio']
i += 1
else:
j += 1
Пользовательский атрибут Token._.audio
с определённым значением по умолчанию задаём в начале исполнения программы, если необходимо по конфигурации:
#main.py
class Main:
def __init__(self):
…
if config.speech_synth and config.use_mfa:
self.output_audio: 'AudioSegment' = self.container.synthesizer.silent(0)
if not Token.has_extension('audio'):
Token.set_extension('audio', default=self.container.synthesizer.silent(200))
Всё это инкапсулируем в собственный класс, где сразу начинаем формировать финальный аудио-фрагмент обработанного предложения:
class MFAligner:
def __init__(self, parent: 'Sentence') -> None:
if not config.speech_synth or not config.use_mfa:
self.idle = True
return
…
self.sentence_audio = self.synth.synthesize(self.parent.sentence, config.source_lang, speed)
self.translated_audio = self.synth.synthesize(self.parent.translated_sentence, config.target_lang, speed)
self.output_audio = self.sentence_audio[:]
self._process_mfa_alignment()
При каждом добавлении нового результата, в процессе работы методов Sentence
, добавляем звучание результирующих токенов к выводу:
def append_mfa_audio_to_output(self, result_src: list['Token'], result_trg: list['Token'] | str) -> None:
if self.idle:
return
for token in result_src:
self.output_audio += token._.audio
self.output_audio += self.synth.silent(200)
if isinstance(result_trg, list):
for token in result_trg:
self.output_audio += token._.audio
else:
translation_audio = self.synth.synthesize(result_trg, config.target_lang)
self.output_audio += translation_audio
Если перевод пришёл на вход не в виде списка токенов, а в виде обычной строки, обращаемся к синтезатору, как и на основном пути. Вызываем метод в точках добавления результата – treat_dict_entity
и treat_trie_entity
класса Sentence
, и в конце определим закрывающий обработку метод:
#sentence_processing.py
class Sentence:
def __init__(…):
…
self.mfa_aligner = MFAligner(self) # -> _process_mfa_alignment()
def treat_dict_entity(…):
…
if lemma.check_repeat(self.entity_counter) and entity.check_repeat(self.entity_counter):
…
self.mfa_aligner.append_mfa_audio_to_output(tokens_src, tokens_trg)
self.skip |= set(tokens_src)
def treat_trie_entity(…):
if entity.check_repeat(self.entity_counter):
entity.update(self.entity_counter)
self.result.append((' '.join(token.text.lower() for token in tokens_src), entity.translation))
self.mfa_aligner.append_mfa_audio_to_output(tokens_src, tokens_trg if tokens_trg else entity.translation)
self.skip |= set(tokens_src)
def get_result_mfa_audio(self) -> 'AudioSegment':
return self.mfa_aligner.get_result_audio(self._translation_line_condition)
#mfa_aligner.py
class MFAligner:
def get_result_audio(self, additional_translation: bool = False) -> 'AudioSegment':
if additional_translation:
self.output_audio += self.translated_audio
return self.output_audio
Теперь в Мain
, при обработке каждого предложения будем добавлять к аудио-выводу обработанное предложение, а формирование финального аудио дополнительно обернём в проверку пути обработки:
class Main:
def process(self, text: str) -> None:
…
for sentence_text in sentences:
…
if config.speech_synth and config.use_mfa:
self.output_audio += sentence_obj.get_result_mfa_audio()
if config.speech_synth:
if not config.use_mfa:
if config.use_ssml:
self.output_audio = self.container.synthesizer.synthesize(
''.join(self.output_ssml), config.source_lang, 1
)
else:
self.output_audio = self.container.synthesizer.synthesize_by_parts(
self.output_text, config.sentence_pronunciation_speed
)
self.container.synthesizer.save_audio(self.output_audio, 'multilingual_output')
Готово!
И последним штрихом, в конце всей обработки основного цикла, не забудем сохранить наши структуры и позиции, с небольшой защитой от случайной перезаписи, и определим метод их загрузки для следующих генераций:
def save_structures(self) -> None:
filepath = config._root_dir / f'{config.output_storage_filename}.pkl'
if filepath.exists():
old_filepath = filepath.with_suffix('.pkl.old')
if old_filepath.exists():
old_filepath.unlink()
filepath.rename(old_filepath)
with open(filepath, 'wb') as file:
pickle.dump((self.lemma_dict, self.lemma_trie, self.entity_counter, self.sentence_counter), file)
def load_structures(self) -> None:
with open(config._root_dir / f'{config.input_storage_filename}.pkl', 'rb') as file:
self.lemma_dict, self.lemma_trie, self.entity_counter, self.sentence_counter = pickle.load(file)
❯ Итоги и результаты
Что же, счётчик строк уже перевалил за тысячу, потому стоит кратко пробежаться по тому, что мы вообще здесь сделали:
- выполнили перевод каждого предложения, предоставив пользователю выбор конкретного переводчика
- …или же сопоставили текст с заданным монолитным переводом, сделав заготовку параллельного текста
- для каждого предложения на двух языках подготовили токены, добавив к ним эмбеддинги и решив вопрос совместимости токенизаторов
- раздельно реализовали структуры хранения однословных и многословных сущностей, с двумя уровнями хранения, по леммам и конкретным формам, для более качественного вывода с минимизацией повторений
- описали центральную логику обработки токенов, где основную часть заняла работа с выравниваниями, сдобренная собственными перепроверками и дополнениями
- на основе этого сформировали вывод для каждого предложения, состоящий из оригинального предложения, словаря новых и закрепляемых выражений и слов, и перевода полного предложения, если оно необходимо
- отдельно предусмотрели вывод в ssml формате, по определённому нами шаблону, для поддерживающих данный формат синтезаторов
- реализовали вариативный синтез речи с использованием различных инструментов и конфигураций
- отдельно проработали путь с распознаванием и переиспользованием аудио-фрагментов синтезированного предложения в конкретных токенах, для унификации звучания и снижения нагрузки на синтезатор
- сохранили наши структуры для следующих генераций
Так мы прошлись по всем важным точкам приложения оставив методы не влияющие на логику и общую структуру за скобками. Полный код приложения всё так же доступен на гитхабе. А сейчас посмотрим, что из этого получилось:
Здесь представлены не лучшие результаты, а просто случайный текст с перебором различных параметров с пустым словарём. При озвучке использовались значения скорости 0.9 для предложений и 0.7 для словаря. Остальные изменяемые параметры представлены в видео.
Первым делом в глаза, а точнее в уши, бросается синтез одиночных слов. Это большая проблема, ибо их озвучка – обязательная часть нашего приложения. Но как не передавая контекст избавиться от искажений и восходящих акцентуаций на «одиночках» пока не совсем понятно. С MFA
интересный подход, но его использование невероятно увеличивает время обработки, да и вырванные слова сопровождаются характерным «дёрганьем». Можно было бы с ssml генерировать текст с явными паузами через <break/>
между словами, чтобы их фрагменты были чище, но синтезатор озвучивает подобный текст с паузацией как отдельные слова, возвращая нас к начальным проблемам. Так что вопрос улучшения звучания одиночных слов пока открытый, как и комментарии ниже, спасибо.
Дальше – выравнивания. Здесь конечно, далеко до идеала, но в целом результат внушает доверие. Да, есть ошибки в обе стороны, как с пропусками очевидных пар, так и с ложным сопоставлением непонятно чего, но это правится подбором конфигурации и повышением порога соответствия. К тому же эта часть полностью открыта для доработок, например, можно подумать о возможных постпроверках или совмещении методов выравнивания.
И теперь о центральной логике – изучение и закрепление слов. Здесь никаких сложностей, она просто работает, как и должна. Дальнейшие результаты уже полностью зависят от пользователя, который сам нарабатывает словарь и устанавливает собственные значения интервалов повторения.
Вряд ли кто-то всерьёз будет сравнивать звучания, но ниже представлены две версии результата на коротком рассказе Дино Буццати «Девушка, летящая вниз». Текст не подбирался по простоте, больше для демонстрации. Для первых генераций для прослушивания лучше взять тексты с более короткими предложениями, или предварительно обучить словарь без синтеза речи на собственной выборке произведений. В качестве параметров конфигурации использовались alignment=itermax, aggregator=attention, weight=0.5
, с переводом и озвучкой через GC. Всего в рассказе 1560 слов.
В данных примерах на пустом пользовательском словаре в результат прошло 555 новых сущностей, в то время как на словаре обученном на одном-единственном «Моби Дике» уже 353 (новых+закрепляемых). Конечно, первые аудио будут объёмными, но эта разница показывает, как с каждой новой сгенерированной книгой отвлечений на перевод будет всё меньше, а пользовательский словарь всё шире, что позволяет считать общую цель сегодняшнего текста достигнутой.
Мы начали с простой идеи – совместить параллельные переводы и аудиокниги, без постоянного отвлечения и утраты погружения. Система частичного перевода, которую мы создали, нацелена на то, чтобы предоставить каждому пользователю индивидуализированный, адаптивный опыт.
В этом подходе пользователь оказывается не просто пассивным слушателем, а активным участником процесса. Словарь постепенно расширяется, тексты становятся более понятными, а неизвестные слова плавно интегрируются в общий контекст. Это создаёт ощущение прогресса, и с каждой новой книгой потребность в помощи сокращается, а уровень уверенности растёт.
Надеюсь, было интересно, а лучше, если будет полезно. За сим всё, спасибо за внимание.
Никак не связан с создателем канала, кроме подписки на него. Просто хочется помочь новым слушателям и автору найти друг друга. Hidden gem, imo. Такие дела.
- 🟢 Цикл статей о клеточных автоматах (последняя статья)
- 🟢 История моделирования лесных пожаров
- 🟠 REcollapse: фаззинг с использованием unicode-нормализации
- 🔵 Хватит использовать [a-zа-яё]: правильная работа с символами и категориями Unicode в регулярных выражениях
- 🟢 Краткая история календаря и фантазии о шестидневной неделе
- 🟢 Пройти LeetCode за год: экскурсия по сайту и roadmap
- 🟢 Абсолютная мультиязычность и типографика на любой раскладке
Автор: TLHE