На Хабре не так уж много статей, посвященных Vert.x, раз, два и обчёлся. Поэтому решил внести свой вклад и опубликовать небольшой урок, в котором рассказано, как написать простой чат с помощью Vert.x 3.
Содержание
- Что такое Vert.x?
- О чате
- Структура проекта
- Сервер
- Клиент
- Тестирование
- Сборка и запуск исполняемого модуля
- Полный исходный код
- Полезные ресурсы
Что такое Vert.x?
Vert.x это событийно-ориентированный фреймворк работающий на JVM. На данный момент последняя версия этого фреймворка 3.2. Vert.x 3 предоставляет следующие возможности:
- Мультиязычность. Компоненты приложения могут быть разработаны на Java, JavaScript, Scala, Python, Ruby, Groovy, а также Clojure;
- Параллелизм. Довольно-таки простая модель параллелизма, освобождающая от хлопот многопоточного программирования;
- Асинхронность. Простая модель асинхронного взаимодействия без блокировки;
- Распределенная шина событий. Включающая как клиентскую, так и серверную стороны. Играет непосредственно главную роль в нашем чате;
- Java 8. Vert.x 3 требует версии Java не ниже 8.
Более подробно о чате
Приложение запускается на сервере, после развертывания публикуется адрес анонимного чата, к которому можно присоединиться, через любой браузер. По этому адресу приложение транслирует в реальном времени сообщения от всех пользователей.
Поехали!
Разработку будем вести в IntelliJ IDEA 15, достаточно Community-версии.
Структура проекта
Создаем maven-проект. К сожалению готового архетипа, для vert.x 3 нет (хотя для 2 существует), поэтому генерим обычный maven-проект. Его конечная структура будет иметь следующий вид:
src
+---main
| +---java
| | | Server.java
| | | VerticleLoader.java
| | |
| | ---webroot
| | date-format.js
| | index.html
| | vertx-eventbus.js
| |
| ---resources
---test
---java
ChatTest.java
В pom.xml задаем следующие зависимости. Где vertx-core библиотека поддержки Verticles (более подробно, что это такое, немного дальше), vertx-web – позволяет использовать обработчик событий (и не только) и vertx-unit – для модульного тестирования.
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-unit</artifactId>
<version>3.0.0</version>
<scope>test</scope>
</dependency>
</dependencies>
Сервер
Особенностью данного фреймворка является то, что все компоненты должны быть представлены в виде Verticle.
Verticle – это некоторый аналог сервлета, является атомарной единицей развёртывания. Сами разработчики описывают Verticle, как нечто похожее на актера в модели акторов. Собственно, эта конструкция позволяет организовать высокую степень параллелизма и асинхронности, чем и славится Vert.x. В реализации нашего сервера, мы наследуем абстрактный класс AbstractVerticle.
Переопределяемый нами метод start() является точкой входа в программу. Сначала выполняется развертывание приложения – функция deploy(), затем вешается обработчик – метод handle().
public class Server extends AbstractVerticle {
private Logger log = LoggerFactory.getLogger(Server.class);
private SockJSHandler handler = null;
private AtomicInteger online = new AtomicInteger(0);
//точка входа.
@Override
public void start() throws Exception {
if (!deploy()) {
log.error("Failed to deploy the server.");
return;
}
handle();
}
//...
}
Для развертывания приложения необходимо получить свободный порт, в случае, если не удалось его получить, в hostPort будет отрицательное значение. Далее создаем роутер, указываем для него адрес получателя и вешаем обработчик. И наконец, запускаем HTTP-Server на доступном порту.
//развертывание приложения.
private boolean deploy() {
int hostPort = getFreePort();
if (hostPort < 0)
return false;
Router router = Router.router(vertx);
//обработчик событий.
handler = SockJSHandler.create(vertx);
router.route("/eventbus/*").handler(handler);
router.route().handler(StaticHandler.create());
//запуск веб-сервера.
vertx.createHttpServer().requestHandler(router::accept).listen(hostPort);
try {
String addr = InetAddress.getLocalHost().getHostAddress();
log.info("Access to "CHAT" at the following address: nhttp://" + addr + ":" + hostPort);
} catch (UnknownHostException e) {
log.error("Failed to get the local address: [" + e.toString() + "]");
return false;
}
return true;
}
Процесс получения свободного порта представлен во фрагменте кода ниже. Сначала проверяется static-поле PROCESS_ARGS на наличие аргументов запуска приложения, одним из которых может быть порт развёртывания приложения, заданный пользователем. В случае, если порт не был задан, используется порт по умолчанию: 8080.
//получение свободного порта для развертывания приложения.
private int getFreePort() {
int hostPort = 8080;
//если порт задан в качестве аргумента,
// при запуске приложения.
if (Starter.PROCESS_ARGS != null
&& Starter.PROCESS_ARGS.size() > 0) {
try {
hostPort = Integer.valueOf(Starter.PROCESS_ARGS.get(0));
} catch (NumberFormatException e) {
log.warn("Invalid port: [" + Starter.PROCESS_ARGS.get(0) + "]");
}
}
//если некорректно указан порт.
if (hostPort < 0 || hostPort > 65535)
hostPort = 8080;
return getFreePort(hostPort);
}
Если в качестве аргумента конструктора создания сокета, указан параметр со значением 0, то в таком случае будет выдан случайный свободный порт.
Когда порт уже занят (например, порт 8080 уже используется другим приложением, но при этом, он указан в качестве аргумента запуска текущего приложения), выбрасывается исключение BindException, в таком случае выполняется повторная попытка получения свободного порта.
private int getFreePort(int hostPort) {
try {
ServerSocket socket = new ServerSocket(hostPort);
int port = socket.getLocalPort();
socket.close();
return port;
} catch (BindException e) {
//срабатывает, когда указанный порт уже занят.
if (hostPort != 0)
return getFreePort(0);
log.error("Failed to get the free port: [" + e.toString() + "]");
return -1;
} catch (IOException e) {
log.error("Failed to get the free port: [" + e.toString() + "]");
return -1;
}
}
В случае успешного развертывания, начинается прослушивание шины событий по адресам: chat.to.server (входящие события) и chat.to.client (исходящие события).
После обработки очередного события на шине, необходимо чекнуть это событие.
private void handle() {
BridgeOptions opts = new BridgeOptions()
.addInboundPermitted(new PermittedOptions().setAddress("chat.to.server"))
.addOutboundPermitted(new PermittedOptions().setAddress("chat.to.client"));
//обработка приходящих событий.
handler.bridge(opts, event -> {
if (event.type() == PUBLISH)
publishEvent(event);
if (event.type() == REGISTER)
registerEvent(event);
if (event.type() == SOCKET_CLOSED)
closeEvent(event);
//обратите внимание, после обработки события
// должен вызываться говорящий сам за себя метод.
event.complete(true);
});
}
Любые события, которые происходят на шине, могут быть представлены 7 следующими типами:
Тип | Событие |
---|---|
SOCKET_CREATED | возникает при создании сокета |
SOCKET_CLOSED | при закрытии сокета |
SEND | попытка отправки сообщения от клиента к серверу |
PUBLISH | публикация сообщения клиентом для сервера |
RECEIVE | уведомление от сервера, о доставленном сообщении |
REGISTER | попытка зарегистрировать обработчик |
UNREGISTER | попытка отменить зарегистрированный обработчик |
В нашем приложении нам достаточно лишь обрабатывать события с типом PUBLISH, REGISTER и SOCKET_CLOSED.
Событие с типом PUBLISH срабатывает, когда кто-то из пользователей отправляет сообщение в чат.
REGISTER – срабатывает тогда, когда пользователь регистрирует обработчик. Почему не SOCKET_CREATED? Потому что, событие с типом SOCKET_CREATED предшествует – REGISTER, и, естественно, пока клиент не зарегистрирует обработчик, он не сможет получать события.
SOCKET_CLOSED – возникает, всегда когда пользователь покидает чат или когда возникает непредвиденная ситуация.
При публикации сообщения, срабатывает обработчик и вызывает метод publishEvent. Проверяется адрес назначения, в случае, если он корректен, сообщение извлекается, затем проверяется и публикуется на шине событий для всех клиентов (в т.ч. и отправителя).
private boolean publishEvent(BridgeEvent event) {
if (event.rawMessage() != null
&& event.rawMessage().getString("address").equals("chat.to.server")) {
String message = event.rawMessage().getString("body");
if (!verifyMessage(message))
return false;
String host = event.socket().remoteAddress().host();
int port = event.socket().remoteAddress().port();
Map<String, Object> publicNotice = createPublicNotice(host, port, message);
vertx.eventBus().publish("chat.to.client", new Gson().toJson(publicNotice));
return true;
} else
return false;
}
Генерация уведомления для публикации сообщения выглядит следующим образом:
//создание уведомления о публикации сообщения.
private Map<String, Object> createPublicNotice(String host, int port, String message) {
Date time = Calendar.getInstance().getTime();
Map<String, Object> notice = new TreeMap<>();
notice.put("type", "publish");
notice.put("time", time.toString());
notice.put("host", host);
notice.put("port", port);
notice.put("message", message);
return notice;
}
Вход и выход пользователей в чат обрабатываются следующим способом:
//тип события - регистрация обработчика.
private void registerEvent(BridgeEvent event) {
if (event.rawMessage() != null
&& event.rawMessage().getString("address").equals("chat.to.client"))
new Thread(() ->
{
Map<String, Object> registerNotice = createRegisterNotice();
vertx.eventBus().publish("chat.to.client", new Gson().toJson(registerNotice));
}).start();
}
//создание уведомления о регистрации пользователя.
private Map<String, Object> createRegisterNotice() {
Map<String, Object> notice = new TreeMap<>();
notice.put("type", "register");
notice.put("online", online.incrementAndGet());
return notice;
}
//тип события - закрытие сокета.
private void closeEvent(BridgeEvent event) {
new Thread(() ->
{
Map<String, Object> closeNotice = createCloseNotice();
vertx.eventBus().publish("chat.to.client", new Gson().toJson(closeNotice));
}).start();
}
//создание уведомления о выходе пользвателя из чата.
private Map<String, Object> createCloseNotice() {
Map<String, Object> notice = new TreeMap<>();
notice.put("type", "close");
notice.put("online", online.decrementAndGet());
return notice;
}
Проверка публикуемого сообщения достаточно примитивная, но для примера и этого достаточно, т.е. вы её можете сами усложнить проверяя, например, на передачу скриптов в виде сообщения и прочих хаков.
private boolean verifyMessage(String msg) {
return msg.length() > 0
&& msg.length() <= 140;
}
Для обмена данными используется формат JSON, поэтому файл pom.xml необходимо обновить, добавив следующую зависимость:
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.3.1</version>
</dependency>
Также, в нашем чате будет отображаться счетчик числа онлайн-пользователей, т.к. наше приложение многопоточное, оно гарантировано должно быть thread-safety, поэтому наиболее простой способ объявить наш счётчик как AtomicInteger.
Клиент
Создаем index.html в разделе webroot, как это представленной на структуре в начале статьи. Для общения с сервером, а точнее, с шиной событий используется библиотека vertx-eventbus.js.
Для форматирования даты, будем использовать библиотеку date-format.js, довольно-таки простая и удобная. Помимо этого, в качестве html оформления будем использовать bootstrap версии 3.3.5, sockjs.js версии 0.3.4, необходимый для библиотеки vertx-eventbus.js и jquery версии 1.11.3.
Обработчик шины событий на стороне клиента выглядит следующим образом:
var online = 0; //счетчик онлайн-пользователей.
var eb = new EventBus("/eventbus/"); //шина событий.
eb.onopen = function() {
//обработчик событий в чате.
eb.registerHandler("chat.to.client", eventChatProcessing);
};
//обработчик событий в чате.
function eventChatProcessing(err, msg) {
var event = jQuery.parseJSON(msg.body);
if (event.type == 'publish') { //сообщение.
var time = Date.parse(event.time);
var formattedTime = dateFormat(time, "dd.mm.yy HH:MM:ss");
//добавить сообщение.
appendMsg(event.host, event.port, event.message, formattedTime);
} else { //изменение числа пользователей.
//type: register или close.
online = event.online;
$('#online').text(online);
}
};
В случае, если тип события publish (т.е. публикация сообщения), то данные из события (event) формируются в кортеж и присоединяются к таблице сообщений. Иначе, когда тип события соответствует новому или ушедшему пользователю, просто обновляется счетчик онлайн пользователей. Функция добавления сообщения довольно-таки проста.
//добавление нового сообщения.
function appendMsg(host, port, message, formattedTime) {
var $msg = $('<tr bgcolor="#dff0d8"><td align="left">' + formattedTime + '</td><td align="left">' + host + ' [' + port + ']' + '</td><td>' + message + '</td></tr>');
var countMsg = $('#messages tr').length;
if (countMsg == 0)
$('#messages').append($msg);
else
$('#messages > tbody > tr:first').before($msg);
}
Во время отправки сообщения, оно сначала публикуется по адресу “chat.to.server”, где его обрабатывает сервер, в случае, если сообщение проходит верификацию, оно рассылается всем клиентам, в т.ч. и отправителю.
$(document).ready(function() {
//событие отправления сообщения.
$('#chatForm').submit(function(evt) {
evt.preventDefault();
var message = $('#message').val();
if (message.length > 0) {
//отправление сообщения на шину событий.
eb.publish("chat.to.server", message);
$('#message').val("").focus();
countChar();
}
});
});
Ну и наконец последний метод, который обрабатывает количество введенных символов, по условию, пользователь не может ввести сообщение длиной более 140 символов.
//счетчик введенных символов.
function countChar() {
var len = $('#message').val().length;
if (len > 140) {
var msg = $('#message').val().substring(0, 140);
$('#message').val(msg);
} else {
$('#charNum').text(140 - len);
var per = 100 / 140 * len;
$('#charNumProgressBar').css('width', per + '%').attr('aria-valuenow', per);
}
};
Полная версия index.html, включая разметку, размещена в конце статьи.
После того, как мы написали серверную и клиентскую части, наступила очередь запуска приложения. Для запуска и удобной отладки, я рекомендую написать собственный загрузчик Verticle, хотя и есть более простая альтернатива, которую я приведу немного позже.
Единственно, значение, которое инициализирует переменную dir должно быть актуально, т.е. в действительности должен существовать такой путь. А также, переменная verticleID должна инициализироваться именем запускаемого verticle-класса, весь остальной код не подлежит изменению.
public class VerticleLoader {
private static Vertx vertx;
public static Vertx getVertx() {
return vertx;
}
public static void load() {
load(null);
}
public static void load(Handler<AsyncResult<String>> completionHandler) {
VertxOptions options = new VertxOptions().setClustered(false);
//путь до verticle-класса.
String dir = "chat/src/main/java/";
try {
File current = new File(".").getCanonicalFile();
if (dir.startsWith(current.getName()) && !dir.equals(current.getName())) {
dir = dir.substring(current.getName().length() + 1);
}
} catch (IOException e) {
}
System.setProperty("vertx.cwd", dir);
String verticleID = Server.class.getName();
Consumer<Vertx> runner = vertx ->
{
try {
if (completionHandler == null)
vertx.deployVerticle(verticleID);
else
vertx.deployVerticle(verticleID, completionHandler);
} catch (Throwable t) {
t.printStackTrace();
}
};
if (options.isClustered()) {
Vertx.clusteredVertx(options, res ->
{
if (res.succeeded()) {
vertx = res.result();
runner.accept(vertx);
} else {
res.cause().printStackTrace();
}
});
} else {
vertx = Vertx.vertx(options);
runner.accept(vertx);
}
}
public static void main(String[] args) {
load();
}
}
Теперь, когда загрузчик готов, создадим конфигурацию запуска: Run – Edit Configuration… – Add New Configuration (Alt + Insert) – Application. Указываем Main Class как VerticleLoader, сохраняем конфигурацию и запускаем.
PROFIT!
Обещанная альтернатива.
Тестирование
Давайте протестируем разработанное нами приложение. Делать мы это будем с использованием JUnit, поэтому необходимо снова открыть pom.xml и добавить следующую зависимость:
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
В setUp мы создаем экземпляр Vertx и развертываем на него нашу Verticle. В отличии от традиционного JUnit методов, все текущие методы получают еще TestContext. Задача этого объекта соблюдать асинхронность наших тестов.
В методе tearDown() для объекта TestContext вызывается asyncAssertSuccess(), он терпит неудачу, если при завершении работы Verticle возникли проблемы.
@RunWith(VertxUnitRunner.class)
public class ChatTest {
private Vertx vertx;
private int port = 8080;
private Logger log = LoggerFactory.getLogger(ChatTest.class);
//@Ignore
@Before
public void setUp(TestContext context) throws IOException {
VerticleLoader.load(context.asyncAssertSuccess());
vertx = VerticleLoader.getVertx();
}
//@Ignore
@After
public void tearDown(TestContext context) {
vertx.close(context.asyncAssertSuccess());
}
//...
}
В методе loadVerticleTest мы проверяем загрузку нашего приложения. Создаем клиента и пытаемся удостовериться, что приложение, развернутое по указанному нами адресу доступно. В случае успеха, мы получаем код состояния 200.
Затем, пытаемся получить содержимое страницы, заголовок которой должен содержать текст “Chat”.
Так как запрос и ответ являются асинхронными операциями, поэтому необходимо это как-то контролировать и получать уведомления, когда тест завершился для этого используется объект Async, вызывающий всегда метод complete() по завершению теста.
@Test
public void loadVerticleTest(TestContext context) {
log.info("*** loadVerticleTest ***");
Async async = context.async();
vertx.createHttpClient().getNow(port, "localhost", "/", response ->
{
context.assertEquals(response.statusCode(), 200);
context.assertEquals(response.headers().get("content-type"), "text/html");
response.bodyHandler(body ->
{
context.assertTrue(body.toString().contains("<title>Chat</title>"));
async.complete();
});
});
}
В методе eventBusTest создается клиент шины событий и вешается обработчик. В то время, пока клиент ждет какие-либо события на шине, публикуется сообщение. Обработчик реагирует на это и проверяет тело входящего события на эквивалентность, в случае успешной проверки тест завершается вызовом async.complete().
@Test
public void eventBusTest(TestContext context) {
log.info("*** eventBusTest ***");
Async async = context.async();
EventBus eb = vertx.eventBus();
eb.consumer("chat.to.server").handler(message ->
{
String getMsg = message.body().toString();
context.assertEquals(getMsg, "hello");
async.complete();
});
eb.publish("chat.to.server", "hello");
}
Запускаем тесты.
Сборка и запуск исполняемого модуля
Для этого необходимо в pom.xml добавить плагин maven-shade-plugin. Где Main-Verticle в нашем случае должен указывать на класс Server.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Main-Class>io.vertx.core.Starter</Main-Class>
<Main-Verticle>Server</Main-Verticle>
</manifestEntries>
</transformer>
</transformers>
<artifactSet/>
<outputFile>${project.build.directory}/${project.artifactId}-${project.version}-fat.jar</outputFile>
</configuration>
</execution>
</executions>
</plugin>
Выполняем команду Run Maven Build, после чего в каталоге target появится chat-1.0-fat.jar. Для запуска приложения исполняемый модуль и папка webroot должны находиться в одном каталоге. Чтобы развернуть наше приложение на порту 12345 необходимо выполнить команду:
java -jar chat-1.0-fat.jar 12345
На этом всё. Успехов!
Полный исходный код
import com.google.gson.Gson;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Starter;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.StaticHandler;
import io.vertx.ext.web.handler.sockjs.BridgeEvent;
import io.vertx.ext.web.handler.sockjs.BridgeOptions;
import io.vertx.ext.web.handler.sockjs.PermittedOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;
import java.io.IOException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import static io.vertx.ext.web.handler.sockjs.BridgeEvent.Type.*;
public class Server extends AbstractVerticle {
private Logger log = LoggerFactory.getLogger(Server.class);
private SockJSHandler handler = null;
private AtomicInteger online = new AtomicInteger(0);
//точка входа.
@Override
public void start() throws Exception {
if (!deploy()) {
log.error("Failed to deploy the server.");
return;
}
handle();
}
//развертывание приложения.
private boolean deploy() {
int hostPort = getFreePort();
if (hostPort < 0)
return false;
Router router = Router.router(vertx);
//обработчик событий.
handler = SockJSHandler.create(vertx);
router.route("/eventbus/*").handler(handler);
router.route().handler(StaticHandler.create());
//запуск веб-сервера.
vertx.createHttpServer().requestHandler(router::accept).listen(hostPort);
try {
String addr = InetAddress.getLocalHost().getHostAddress();
log.info("Access to "CHAT" at the following address: nhttp://" + addr + ":" + hostPort);
} catch (UnknownHostException e) {
log.error("Failed to get the local address: [" + e.toString() + "]");
return false;
}
return true;
}
//получение свободного порта для развертывания приложения.
private int getFreePort() {
int hostPort = 8080;
//если порт задан в качестве аргумента,
// при запуске приложения.
if (Starter.PROCESS_ARGS != null
&& Starter.PROCESS_ARGS.size() > 0) {
try {
hostPort = Integer.valueOf(Starter.PROCESS_ARGS.get(0));
} catch (NumberFormatException e) {
log.warn("Invalid port: [" + Starter.PROCESS_ARGS.get(0) + "]");
}
}
//если некорректно указан порт.
if (hostPort < 0 || hostPort > 65535)
hostPort = 8080;
return getFreePort(hostPort);
}
//если в качестве порта указано значение 0,
// то выдается случайный свободный порт.
private int getFreePort(int hostPort) {
try {
ServerSocket socket = new ServerSocket(hostPort);
int port = socket.getLocalPort();
socket.close();
return port;
} catch (BindException e) {
//срабатывает, когда указанный порт уже занят.
if (hostPort != 0)
return getFreePort(0);
log.error("Failed to get the free port: [" + e.toString() + "]");
return -1;
} catch (IOException e) {
log.error("Failed to get the free port: [" + e.toString() + "]");
return -1;
}
}
private void handle() {
BridgeOptions opts = new BridgeOptions()
.addInboundPermitted(new PermittedOptions().setAddress("chat.to.server"))
.addOutboundPermitted(new PermittedOptions().setAddress("chat.to.client"));
//обработка приходящих событий.
handler.bridge(opts, event -> {
if (event.type() == PUBLISH)
publishEvent(event);
if (event.type() == REGISTER)
registerEvent(event);
if (event.type() == SOCKET_CLOSED)
closeEvent(event);
//обратите внимание, после обработки события
// должен вызываться говорящий сам за себя метод.
event.complete(true);
});
}
//тип события - публикация сообщения.
private boolean publishEvent(BridgeEvent event) {
if (event.rawMessage() != null
&& event.rawMessage().getString("address").equals("chat.to.server")) {
String message = event.rawMessage().getString("body");
if (!verifyMessage(message))
return false;
String host = event.socket().remoteAddress().host();
int port = event.socket().remoteAddress().port();
Map<String, Object> publicNotice = createPublicNotice(host, port, message);
vertx.eventBus().publish("chat.to.client", new Gson().toJson(publicNotice));
return true;
} else
return false;
}
//создание уведомления о публикации сообщения.
private Map<String, Object> createPublicNotice(String host, int port, String message) {
Date time = Calendar.getInstance().getTime();
Map<String, Object> notice = new TreeMap<>();
notice.put("type", "publish");
notice.put("time", time.toString());
notice.put("host", host);
notice.put("port", port);
notice.put("message", message);
return notice;
}
//тип события - регистрация обработчика.
private void registerEvent(BridgeEvent event) {
if (event.rawMessage() != null
&& event.rawMessage().getString("address").equals("chat.to.client"))
new Thread(() ->
{
Map<String, Object> registerNotice = createRegisterNotice();
vertx.eventBus().publish("chat.to.client", new Gson().toJson(registerNotice));
}).start();
}
//создание уведомления о регистрации пользователя.
private Map<String, Object> createRegisterNotice() {
Map<String, Object> notice = new TreeMap<>();
notice.put("type", "register");
notice.put("online", online.incrementAndGet());
return notice;
}
//тип события - закрытие сокета.
private void closeEvent(BridgeEvent event) {
new Thread(() ->
{
Map<String, Object> closeNotice = createCloseNotice();
vertx.eventBus().publish("chat.to.client", new Gson().toJson(closeNotice));
}).start();
}
//создание уведомления о выходе пользвателя из чата.
private Map<String, Object> createCloseNotice() {
Map<String, Object> notice = new TreeMap<>();
notice.put("type", "close");
notice.put("online", online.decrementAndGet());
return notice;
}
//довольно простая проверка сообщения,
// конечно её можно усложнить,
// но для пример и этого достаточно ;)
private boolean verifyMessage(String msg) {
return msg.length() > 0
&& msg.length() <= 140;
}
}
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.impl.StringEscapeUtils;
import java.io.File;
import java.io.IOException;
import java.util.function.Consumer;
public class VerticleLoader {
private static Vertx vertx;
public static Vertx getVertx() {
return vertx;
}
public static void load() {
load(null);
}
public static void load(Handler<AsyncResult<String>> completionHandler) {
VertxOptions options = new VertxOptions().setClustered(false);
//путь до verticle-класса.
String dir = "chat/src/main/java/";
try {
File current = new File(".").getCanonicalFile();
if (dir.startsWith(current.getName()) && !dir.equals(current.getName())) {
dir = dir.substring(current.getName().length() + 1);
}
} catch (IOException e) {
}
System.setProperty("vertx.cwd", dir);
String verticleID = Server.class.getName();
Consumer<Vertx> runner = vertx ->
{
try {
if (completionHandler == null)
vertx.deployVerticle(verticleID);
else
vertx.deployVerticle(verticleID, completionHandler);
} catch (Throwable t) {
t.printStackTrace();
}
};
if (options.isClustered()) {
Vertx.clusteredVertx(options, res ->
{
if (res.succeeded()) {
vertx = res.result();
runner.accept(vertx);
} else {
res.cause().printStackTrace();
}
});
} else {
vertx = Vertx.vertx(options);
runner.accept(vertx);
}
}
public static void main(String[] args) {
load();
}
}
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.io.IOException;
@RunWith(VertxUnitRunner.class)
public class ChatTest {
private Vertx vertx;
private int port = 8080;
private Logger log = LoggerFactory.getLogger(ChatTest.class);
//@Ignore
@Before
public void setUp(TestContext context) throws IOException {
//развертывание нашей Verticle.
VerticleLoader.load(context.asyncAssertSuccess());
vertx = VerticleLoader.getVertx();
}
//@Ignore
@After
public void tearDown(TestContext context) {
vertx.close(context.asyncAssertSuccess());
}
//@Ignore
@Test
public void loadVerticleTest(TestContext context) {
log.info("*** loadVerticleTest ***");
Async async = context.async();
vertx.createHttpClient().getNow(port, "localhost", "/", response ->
{
//проверка доступности развернутого нами приложения.
context.assertEquals(response.statusCode(), 200);
context.assertEquals(response.headers().get("content-type"), "text/html");
//проверка содержимого страницы.
response.bodyHandler(body ->
{
context.assertTrue(body.toString().contains("<title>Chat</title>"));
async.complete();
});
});
}
//@Ignore
@Test
public void eventBusTest(TestContext context) {
log.info("*** eventBusTest ***");
Async async = context.async();
EventBus eb = vertx.eventBus();
//ожидание события на шине.
eb.consumer("chat.to.server").handler(message ->
{
String getMsg = message.body().toString();
context.assertEquals(getMsg, "hello");
async.complete();
});
//отправка сообщения на шину.
eb.publish("chat.to.server", "hello");
}
}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Chat</title>
<meta charset="windows-1251">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<script src="//cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>
<link rel="stylesheet" href="http://maxcdn.bootstrapcdn.com/bootstrap/3.3.5/css/bootstrap.min.css">
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.3/jquery.min.js"></script>
<script src="http://maxcdn.bootstrapcdn.com/bootstrap/3.3.5/js/bootstrap.min.js"></script>
<script src="date-format.js"></script>
<script src="vertx-eventbus.js"></script>
<style type="text/css">
body {
padding-top: 40px;
padding-bottom: 40px;
background-color: #f5f5f5;
}
.received{
width: 160px;
font-size: 10px;
}
input[type=text]:focus, textarea:focus{
box-shadow: 0 0 5px #4cae4c;
border: 1px solid #4cae4c;
}
.tab-content{
padding:5px
}
</style>
<script>
var online = 0; //счетчик онлайн-пользователей.
var eb = new EventBus("/eventbus/"); //шина событий.
eb.onopen = function() {
//обработчик событий в чате.
eb.registerHandler("chat.to.client", eventChatProcessing);
};
//обработчик событий в чате.
function eventChatProcessing(err, msg) {
var event = jQuery.parseJSON(msg.body);
if (event.type == 'publish') {//сообщение.
var time = Date.parse(event.time);
var formattedTime = dateFormat(time, "dd.mm.yy HH:MM:ss");
//добавить сообщение.
appendMsg(event.host, event.port, event.message, formattedTime);
} else { //изменение числа пользователей.
//type: register или close.
online = event.online;
$('#online').text(online);
}
};
//добавление нового сообщения.
function appendMsg(host, port, message, formattedTime){
var $msg = $('<tr bgcolor="#dff0d8"><td align="left">' + formattedTime
+ '</td><td align="left">' + host + ' [' + port + ']'
+ '</td><td>' + message
+ '</td></tr>');
var countMsg = $('#messages tr').length;
if (countMsg == 0)
$('#messages').append($msg);
else
$('#messages > tbody > tr:first').before($msg);
}
$(document).ready(function() {
//событие отправления сообщения.
$('#chatForm').submit(function(evt) {
evt.preventDefault();
var message = $('#message').val();
if (message.length > 0) {
//отправление сообщения на шину событий.
eb.publish("chat.to.server", message);
$('#message').val("").focus();
countChar();
}
});
});
//счетчик введенных символов.
function countChar() {
var len = $('#message').val().length;
if (len > 140) {
var msg = $('#message').val().substring(0, 140);
$('#message').val(msg);
} else {
$('#charNum').text(140 - len);
var per = 100 / 140 * len;
$('#charNumProgressBar').css('width', per+'%').attr('aria-valuenow', per);
}
};
</script>
</head>
<body>
<div class="container chat-wrapper">
<form id="chatForm">
<h2 align="center" class="alert alert-success">CHAT ROOM</h2>
<fieldset>
<div class="input-group input-group-lg">
<span class="input-group-addon" id="onlineIco">
<span class="glyphicon glyphicon-eye-open"></span>
</span>
<span class="input-group-addon" id="online">
<span class="glyphicon glyphicon-option-horizontal"></span>
</span>
<input type="text" maxlength="141" autocomplete="off" class="form-control"
placeholder="What's new?" id="message" aria-describedby="sizing-addon1"
onkeyup="countChar()"/>
<span class="input-group-btn">
<button class="btn btn-success" type="submit">
<span class="glyphicon glyphicon-send"></span>
</button>
</span>
</div>
</fieldset>
<h3 id="charNum">140</h3>
<div class="progress">
<div id="charNumProgressBar" class="progress-bar progress-bar-success active" role="progressbar"
aria-valuenow="0" aria-valuemin="0" aria-valuemax="100" style="width: 0%">
<span class="sr-only">100% Complete</span>
</div>
</div>
<div class="panel panel-success">
<div class="panel-heading"><h3>New messages</h3></div>
<table id="messages" class="table table-hover" width="100%">
<colgroup>
<col style="width:10%">
<col style="width:10%">
<col style="width:10%">
</colgroup>
</table>
</div>
</form>
</div>
</body>
</html>
Полезные ресурсы
- Этот проект на GitHub;
- Vert.x Documentation;
- Vert.x-Web Documentation;
- My first Vert.x 3 Application;
- Vert.x examples on GitHub;
- Vert.x Tutorial by Jakob Jenkov.
Автор: oxaoo