if(ObjectUtils.equals(rabbitClient, null)) {
final JsonObject config = vertx.getOrCreateContext().config().getJsonObject("rabbitmq");
logger.trace("rabbitmq config: " + config.encodePrettily());
RabbitMQOptions options = new RabbitMQOptions();
options.setUser(config.getString("user", "guest"));
options.setPassword(config.getString("password", "guest"));
options.setHost(config.getString("host", "localhost"));
options.setPort(config.getInteger("port", 5672));
rabbitClient = RabbitMQClient.create(vertx, options);
Future<Void> startFut = Future.future();
rabbitClient.start(startRes -> {
if (startRes.succeeded()) {
logger.trace("rabbitmq client started successfully. isOpenChannel: " + rabbitClient.isOpenChannel()
+ ", isConnected: " + rabbitClient.isConnected());
startFut.complete();
} else {
logger.error("Failed starting rabbitmq client.", startRes.cause());
startFut.fail(startRes.cause());
}
});
startFut.compose(start -> {
Future<Void> declareFut = Future.future();
rabbitClient.queueDeclare(queueName, false, false, false, null, queueDeclareRes -> {
if (queueDeclareRes.succeeded()) {
logger.info("queueName: " + queueName + " declare successfully.");
declareFut.complete();
} else {
logger.error("Failed declare queueName: " + queueName, queueDeclareRes.cause());
declareFut.fail(queueDeclareRes.cause());
}
});
return declareFut;
}).compose(declare -> {
rabbitClient.basicQos(1, basicQosRes -> {
if (basicQosRes.succeeded()) {
logger.trace("set basic Qos=1 successfully.");
result.complete(rabbitClient);
} else {
logger.error("Failed set basic Qos=1.", basicQosRes.cause());
result.fail(basicQosRes.cause());
}
});
}, result);
}
else {
result.complete(rabbitClient);
}
return result;
}