Доброго времени суток, дорогое читатели. Не так давно я начал изучать работу с большими данными (Map/Reduce, NoSQL...) и очень быстро узнал о фреймворке с открытым исходным кодом Apache Hadoop, за изучение которого сразу и принялся.
Данный пост рассчитан на новичков, которые тоже не так давно начали изучать Hadoop. В посте будет разобрано небольшое приложение построенное на этом фреймворке(Этакий Hello World!). Кому интересно, добро пожаловать под кат.
Данный топик не рассматривает процесс установки, настройки и проблем с запуском, однако ресурсы для изучения вы можете посмотреть внизу. Мною в работе были использованы следующие технологии:
- Linux Ubuntu 13.04;
- Oracle Java 1.7;
- Hadoop 1.1.2;
- Intellij IDEA 12;
По скольку счетчик слов (он же Word Count) продемонстрирован в подавляющем большинстве туториалов, я решил разнообразить эту тему и в качестве примера разобрал grep.
Наша реализация будет получать на вход:
- Папку с файлами(Файл) для поиска совпадений по регулярному выражению;
- Путь для сохранения результатов;
- Регулярное выражение;
На выходе мы получим файл(ы) которые содержат полные пути к файлам(ключи) в которых нашлись совпадения и строки(значения) с этими совпадениями в файле.
Весь процесс обработки данных построен на парадигме MapReduce. Суть ее в том, что мы разделяем всю работу на два этапа: map и reduce.
Итак, преступим.
Map
На этом шаге мы в качестве аргумента получаем ключ и значение. Далее эти данные проходят обработку подавая на выход список ключей и список значений.
Наша реализация map функции:
/*
* Маппер.
* Пример построен с использованием нового API org.apache.hadoop.mapreduce.*
*/
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/*
* LongWritable - Тип входного ключа(Номер строки).
* Text - Тип входного значения (Строка под номером ключа).
* Text - Тип выходного ключа(Полный путь к файлу).
* Text - Тип выходного значения(Строка из файла с совпадением).
*/
public class RegexMapper extends Mapper<LongWritable, Text, Text, Text>{
private Pattern pattern;
private Text keyOut; //В качестве ключа на выход будет взят полный путь к файлу.
/*
* Метод setup() вызывается перед вызовом метода map()(который переопределен ниже).
* Используется для отделения подготовительных действий от самой map() функции.
*/
@Override
public void setup(Context context) throws IOException{
/*
* Берем сохраненный аргумент(регулярное выражение),
* который был сохранен в Driver-классе(будет описан далее).
*/
pattern = Pattern.compile(context.getConfiguration().get("regex"));
//Получаем полный путь входной строки файла (valueIn).
Path filePath = ((FileSplit) context.getInputSplit()).getPath();
keyOut = new Text(filePath.toString());
}
/*
* Сам map() метод. Создаем матчер и ищем в строке совпадения. В случае нахожде-
* ния записываем полный путь файла в качестве ключа(keyOut - получен в setup()
* методе) и строку из этого файла в качестве значения(valueIn - получена
* как аргумент метода).
*/
@Override
public void map(LongWritable key, Text valueIn, Context context)
throws IOException, InterruptedException {
Matcher matcher = pattern.matcher(valueIn.toString());
//По скольку входным значением является только одна строка файла, то нам доста-
//точно найти хотя бы одно совпадение, что бы строка подходила условиям поиска.
if (matcher.find())
context.write(keyOut, valueIn); //Запись пары ключ значение
}
}
Вот и все. Переходим к reduce.
Reduce
На этапе reduce мы получаем в качестве аргумента один ключ и все соответствующие ему значения полученные на выходе map метода(ов) для их последующей обработки. В нашем случае мы получили путь к файлу, где найден текст соответствующий заданному шаблону(ключ) и набор строк, где были найдены совпадения(список значений).
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/*
*Все ключи и значения типа Text
*/
public class RegexReducer extends Reducer<Text, Text, Text, Text> {
/*
* В методе мы форматируем полученные данные для записи в файл.
*/
@Override
public void reduce(Text keyIn, Iterable<Text> valuesIn, Context context)
throws IOException, InterruptedException {
//Для конкатенации строк воспользуемся StringBuilder.
StringBuilder valueOut = new StringBuilder();
for(Text value: valuesIn)
valueOut.append("n" + value.toString());
valueOut.append("n");
context.write(keyIn, new Text(valueOut.toString()));
}
}
С map и reduce разобрались. Осталось это все упаковать в класс-драйвер и запустить.
Driver
В классе-драйвере происходит настройка задачи(установка маппера и редусера, типа входных и выходных данных и т. д.).
В общем вот:
import com.petrez.mappers.RegexMapper;
import com.petrez.reducers.RegexReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class Grep {
public static void main(String[] args)
throws IOException, ClassNotFoundException, InterruptedException {
if(args.length != 3) {
System.out.println("Usage: <inDir> <outDir> <regex>");
ToolRunner.printGenericCommandUsage(System.out);
System.exit(-1);
}
Configuration config = new Configuration();
/*
* Сохраняем регулярное выражение для map() метода с ключом regex.
*/
config.set("regex", args[2]);
Job job = new Job(config, "grep");
/*
* Я запускаю программу из jar-файла, поэтому указание главного класса
* приложения необходимо.
*/
job.setJarByClass(Grep.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
/*
* Вот. TextInputFormat разбивает входные файлы на строки и подает их
* в качестве аргумента map функциям. В качестве
* разделителя используется символ возврата каретки.
*/
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(RegexMapper.class);
job.setReducerClass(RegexReducer.class);
job.waitForCompletion(true);
}
}
Осталось только запустить.
В силу того, что реализация рассчитана для начинающих, она упрощена в ущерб эффективности, а именно создание на каждую строку по отдельному мапперу. Данный вариант очень упрощает реализацию маппера и редусера, но сильно нагружает память. Прошу учесть.
По скольку я все упаковал в исполняемый jar-файл, то запустить нашу программку можно так:
<путь к hadoop>/bin/hadoop jar /home/hduser/HadoopGrep.jar <путь с файлами для анализа> <путь для сохранения результатов> <регулярное выражение>
Путем для сохранения результатов должна быть несуществующая директория. Если Вы настроили Hadoop в pseudo-distributed mode, то данные сохраняются в файловой системе HDFS и вам их оттуда еще нужно будет вытащить.
Материалы для изучения:
- Хороший туториал по установке Hadoop
- Очень рекомендую статьи на Yahoo(на мой взгляд даже лучше, чем на сайте проекта)
- Hadoop Stable Version API
- Описание MapReduce парадигмы(Google Research)
Всем спасибо.
Автор: methode