- PVSM.RU - https://www.pvsm.ru -

Прошло много времени с тех пор, как я написал свою последнюю статью по основам RxJS. В комментариях меня попросили показать более сложные примеры, которые могут пригодиться на практике. Поэтому я решил немного разбавить теорию и сегодня мы поговорим про выгрузку файлов.
Что мы будем делать?
Для понимания данной статьи вам потребуются базовые знания RxJS. Что такое Observable [1], операторы [2], а так-же HOO операторы [3]
Не будем тянуть кота за хвост и сразу перейдем к делу!
Для начала нам потребуется сервер, который может принимать запросы на загрузку файлов. Для этого может подойти любой сервер, я для статьи буду использовать node.js в связке с express и multer [4]:
const express = require("express");
const multer = require("multer");
const app = express();
const upload = multer({ dest:"files" });
app.post("/upload", upload.single("file"), function (req, res) {
const { file } = req;
if (file) {
res.send("File uploaded successfully");
} else {
res.error("Error");
}
});
app.listen(3000);
Теперь создадим html страничку, на которой мы разместим все необходимые элементы:
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport"
content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>File uploading</title>
</head>
<body>
<label for="file">load file: <input id="file" type="file"></label>
<button id="upload">Upload</button>
<button id="cancel">Cancel</button>
<div id="progress-bar" style="width: 0; height: 2rem; background-color: aquamarine;"></div>
<script src="index.js" type="text/javascript"></script>
</body>
</html>
Сейчас на страничке у нас есть 4 элемента, с которыми пользователь будет взаимодействовать:
В самом конце тега body я добавил ссылку на скрипт index.js, который нам тоже нужно будет создать:
// Ссылки на элементы, с которыми мы будем взаимодействовать
const input = document.querySelector('#file');
const uploadBtn = document.querySelector('#upload');
const progressBar = document.querySelector('#progress-bar');
const cancelBtn = document.querySelector('#cancel');
Выглядеть все это должно примерно так:

Чтобы сказать браузеру, какой файл нужно выбрать, пользователь должен кликнуть по кнопке «Choose file». После этого откроется диалоговое окно вашей операционной системы, где отобразится дерево папок. После выбора файла браузер загрузит всю необходимую информацию о нем.
Как нам понять, что пользователь выбрал файл? Для этого существует событие «change». После срабатывания события мы можем обратиться к массиву files в input’е, куда и будут записаны данные файла.
Как же нам слушать событие «change»? Можно воспользоваться методом addEventListener и работать с ним. Но мы работаем с RxJS, где любое событие может быть представлено как поток:
fromEvent(input, 'change').pipe(
// достаем файл из массива
map(() => input.files[0])
).subscribe({
next: data => console.log(file)
});
Добавим функцию upload, которая будет выгружать файл на сервер. Пока оставим ее тело пустым:
function upload(file) {
console.log(file);
}
Функция upload должна вызываться после нажатия на кнопку uploadBtn:
fromEvent(uploadBtn, 'click').subscribe({
next: () => upload(input.files[0])
});
Сейчас наш код ничем не отличается от того, который бы мы написали, используя addEventListener. Да, он работает, но если мы оставим его таким, то потеряем те преимущества, которые открывает перед нами RxJS.
Что мы можем сделать? Распишем последовательность шагов для выгрузки файла:
Теперь данную последовательность перенесем на код. Но как объединить потоки input’а и uploadBtn? В этом нам поможет оператор switchMap, который позволяет спроецировать один поток на другой:
fromEvent(input, 'change').pipe(
switchMap(() => fromEvent(uploadBtn, 'click')),
map(() => input.files[0])
).subscribe({
next: file => upload(file)
});
Данный код очень похож на последовательность тех инструкций, что мы описали выше. Пользователь выбирает файл, срабатывает switchMap и мы подписываемся на uploadBtn. Но дальше ничего не произойдет.
switchMap пропускает во внешний поток только те значения, которые генерирует fromEvent(uploadBtn, 'click').Чтобы выгрузка файлов началась, нужно выполнить вторую инструкцию, а именно — нажать на uploadBtn. Затем отработает метод map, который извлечет файл из массива, и уже в subscribe произойдет вызов метода upload.
Самое интересное здесь то, что последовательность инструкций не нарушаема. Чтобы сработала функция upload, нужно, чтобы перед этим сработало событие 'change'.
Но все же одна проблема осталась. Пользователь может выбрать файл, а затем отменить свой выбор. И тогда при попытке выгрузки файла, мы передадим в функцию upload — undefined. Чтобы избежать подобной ситуации, нам следует добавить проверку:
fromEvent(input, 'change').pipe(
switchMap(() => fromEvent(uploadBtn, 'click')),
map(() => input.files[0]),
filter(file => !!file)
).subscribe({
next: file => upload(file)
});
Пора реализовать самое сложное — процесс выгрузки. Я буду показывать его на примере работы с xhr, так как fetch, на момент написания статьи, не умеет отслеживать прогресс выгрузки [5].
Вы можете реализовать выгрузку с помощью любой другой библиотеки, например axios или jQuery.ajax.
Так как на серверной стороне я использую multer, то передавать файл мне придется внутри формы(multer принимает данные только в таком формате). Для этого я написал функцию createFormData:
function createFormData(file) {
const form = new FormData();
// кладем файл в поле с названием file
form.append('file', file);
return form;
}
fromEvent(input, 'change').pipe(
switchMap(() => fromEvent(uploadBtn, 'click')),
map(() => input.files[0]),
filter(file => !!file),
map(file => createFormData(file))
).subscribe({
next: data => upload(data)
});
Выгружать форму мы будем через XMLHttpRequest. Нам потребуется создать экземпляр данного объекта и определить у него методы unload и onerror. Первый будет срабатывать, когда выгрузка завершена, второй — когда произошла ошибка.
function upload(data) {
const xhr = new XMLHttpRequest();
// выводим в консоль сообщение об удачной выгрузке файла
xhr.onload = () => console.log('success');
// выводим сообщение об ошибке
xhr.onerror = e => console.error(e);
// открываем соединение
xhr.open('POST', '/upload', true);
// отправляем форму
xhr.send(data);
}
Теперь у нас есть рабочий пример. Но он содержит пару недостатков:
Все потому, что функция upload работает вне потока. Она живет сама по себе. Нам нужно это исправить. Сделаем так, чтобы функция возвращала нам Observable. Тогда мы сможем контролировать выгрузку файлов:
function upload(data) {
return new Observable(observer => {
const xhr = new XMLHttpRequest();
// когда файл будет выгружен, мы сообщаем об этом в поток
// и завершаем его
xhr.onload = () => {
observer.next();
observer.complete();
};
xhr.onerror = e => observer.error(e);
xhr.open('POST', '/upload', true);
xhr.send(data);
// при отписке - отменяем выгрузку
return () => xhr.abort();
});
}
Обратите внимание на возвращаемую внутри Observable стрелочную функцию. Данный метод будет вызван в момент отписки и отменит выгрузку.
Поместим вызов upload в switchMap:
fromEvent(input, 'change').pipe(
switchMap(() => fromEvent(uploadBtn, 'click')),
map(() => input.files[0]),
filter(file => !!file),
map(file => createFormData(file)),
switchMap(data => upload(data))
).subscribe({
next: () => console.log('File uploaded')
});
Теперь, если пользователь нажмет на кнопку выгрузки еще раз, то предыдущий запрос будет отменен, но создастся новый.
У нас еще осталась кнопка calcelBtn. Мы должны реализовать отмену запроса. Здесь нам поможет оператор takeUntil.
takeUntil переводится как “бери пока”. Данный оператор забирает значения из внешнего потока и отдает их дальше по цепочке. До тех пор, пока внутренний поток существует и ничего не генерирует. Как только внутренний поток сгенерирует значение — takeUntil вызовет метод unsubscribe и отпишется от внешнего потока.
Прежде чем добавить оператор, нужно определить, от какого потока мы хотим отписаться. Нас интересует upload, так как необходимо завершить только выгрузку файла, т.е. отписаться от внутреннего потока:
fromEvent(input, 'change').pipe(
switchMap(() => fromEvent(uploadBtn, 'click')),
map(() => input.files[0]),
filter(file => !!file),
map(file => createFormData(file)),
switchMap(data => upload(data).pipe(
// отписываемся только от upload
takeUntil(fromEvent(cancelBtn, 'click'))
))
).subscribe({
next: () => console.log('File uploaded')
});
Осталось добавить progress bar. Для отслеживания прогресса нам понадобится определить метод xhr.upload.onprogress. Данный метод вызывается при возникновении события ProgressEvent. Объект события содержит в себе несколько полезных для нас свойств:
Внесем изменения в функцию upload:
function upload(data) {
return new Observable(observer => {
const xhr = new XMLHttpRequest();
xhr.upload.onprogress = e => {
// высчитываем проценты
const progress = e.loaded / e.total * 100;
observer.next(progress);
};
xhr.onerror = e => observer.error(e);
xhr.onload = () => observer.complete();
xhr.open('POST', '/upload', true);
xhr.send(data);
return () => xhr.abort();
});
}
Теперь upload выплевывает в поток состояние выгрузки. Осталось только написать функцию, которая будет менять свойства style у элемента progressBar:
function setProgressBarWidth(width) {
progressBar.style.width = `${width}%`;
}
fromEvent(input, 'change').pipe(
/* ..
*/
).subscribe({
next: width => setProgressBarWidth(width)
});

Небольшой совет: чтобы ваши файлы локально выгружались не так быстро, включите настройку «Fast 3G» или «Slow 3G» во вкладке «Performance» в Chrome devtools.
Мы получили полноценное рабочее приложение. Осталось добавить пару штрихов. Сейчас при нажатии на кнопку uploadBtn мы отменяем предыдущую выгрузку и начинаем новую. Но у нас уже есть кнопка отмены.
Хочется, чтобы кнопка uploadBtn не реагировала на последующие нажатия, пока мы не выгрузили файл(или пока мы не отменили выгрузку). Что можно предпринять?
Можно вешать атрибут disable, пока процесс выгрузки не завершится. Но есть другой вариант — оператор exhaustMap. Данный оператор будет игнорировать новые значения из внешнего потока, пока внутренний поток не будет завершен. Заменим switchMap на exhaustMap:
exhaustMap(data => upload(data).pipe(
takeUntil(fromEvent(cancelBtn, 'click'))
))
И вот теперь можно считать наше приложение законченным. Немного рефакторинга и получаем финальный вариант:
import { fromEvent, Observable } from "rxjs";
import { map, switchMap, filter, takeUntil, exhaustMap } from "rxjs/operators";
const input = document.querySelector('#file');
const uploadBtn = document.querySelector('#upload');
const progressBar = document.querySelector('#progress-bar');
const cancelBtn = document.querySelector('#cancel');
const fromUploadBtn = fromEvent(uploadBtn, 'click');
const fromCancelBtn = fromEvent(cancelBtn, 'click');
fromEvent(input, 'change').pipe(
switchMap(() => fromUploadBtn),
map(() => input.files[0]),
filter(file => !!file),
map(file => createFormData(file)),
exhaustMap(data => upload(data).pipe(
takeUntil(fromCancelBtn)
))
).subscribe({
next: width => setProgressBarWidth(width)
});
function setProgressBarWidth(width) {
progressBar.style.width = `${width}%`;
}
function createFormData(file) {
const form = new FormData();
form.append('file', file);
return form;
}
function upload(data) {
return new Observable(observer => {
const xhr = new XMLHttpRequest();
xhr.upload.onprogress = e => {
const progress = e.loaded / e.total * 100;
observer.next(progress);
};
xhr.onerror = e => observer.error(e);
xhr.onload = () => observer.complete();
xhr.open('POST', '/upload', true);
xhr.send(data);
return () => xhr.abort();
});
}
Мой вариант я выложил здесь [6].
Если вы работаете с Angular, то вам не нужно использовать xhr напрямую. В Angular есть HttpClient сервис. Данный сервис может отслеживать прогресс загрузки/выгрузки, для этого достаточно передать следующие параметры в post метод:
Вот как будет выглядеть метод upload в Angular:
export class UploaderService {
constructor(private http: HttpClient) { }
public upload(data: FormData): Observable<number> {
return this.http.post('/upload', data, { reportProgress: true, observe: 'events' })
.pipe(
filter(event => event.type === HttpEventType.UploadProgress),
map(event => event as HttpProgressEvent),
map(event => event.loaded / event.total * 100)
);
}
}
Оператор filter отфильтровывает только события о выгрузке. Остальные события нас не интересуют. Дальше мы приводим событие к HttpProgressEvent, чтобы получить доступ к свойствам loaded и total. Считаем процент.
HttpClient всего лишь обертка над xhr, которая избавляет нас от бойлерплейта и делает работу с HTTP проще.
Пример приложения на Angular можно найти здесь [7].
RxJS очень мощный инструмент в руках разработчика. В его арсенале есть огромнейший набор операторов на все случаи жизни. К сожалению, из-за этого порог входа в данную технологию довольно высок. И часто, люди по незнанию начинают писать свои «велосипеды», из-за чего код становится трудно поддерживаемым.
Поэтому, хочется пожелать всем читателям, не стоять на месте и не бояться экспериментировать. Изучайте RxJS. Вдруг вы наткнетесь на оператор, который может превратить 10 строчек кода в одну. Или поможет сделать код чуточку понятней.
Удачи!
Автор: limitofzero
Источник [8]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/javascript/346244
Ссылки в тексте:
[1] Observable: https://habr.com/ru/post/438642/
[2] операторы: https://habr.com/ru/post/444290/
[3] HOO операторы: https://habr.com/ru/post/450050/
[4] multer: https://github.com/expressjs/multer
[5] прогресс выгрузки: https://fetch.spec.whatwg.org/#fetch-api
[6] здесь: https://github.com/limitofzero/upload-fiels-with-rxjs
[7] здесь: https://github.com/limitofzero/angular-upload-file-with-progress
[8] Источник: https://habr.com/ru/post/487836/?utm_campaign=487836&utm_source=habrahabr&utm_medium=rss
Нажмите здесь для печати.