biterp / pinkrabbitmq Goto Github PK
View Code? Open in Web Editor NEWВнешняя Native API компонента для взаимодействия с RabbitMQ из 1С
License: MIT License
Внешняя Native API компонента для взаимодействия с RabbitMQ из 1С
License: MIT License
Если у кролика заканчивается оперативная память, то он не разывает существующие соединения, а "завешивает" т.е. ничего по ним не отвечает, а ждет когда у него память освободится. Соответственно от компоненты ожидается, что:
даже если сам севрер кролика повесил соединение, то по таймауту сама компонента его прервет (не важно при отправке или при получении соединение зависло)
В компоненте не реализована поддержка TLS.
Соединение с сервером через защищенное подключение (например, порт 5671) возвращает ошибку "Wrong login, password or vhost".
Воспроизведение
Способ воспроизведения:
В случае, если сообщения обрабатываются пачками по N сообщений, возникает потребность обеспечить подтверждение окончания обработки сразу нескольких сообщений.
Сейчас это реазиуется путем запроса N сообщений, обработки на стороне потребителя, потом повторного получения с отправкой подтверждения на каждое сообщение.
При обработке, все сообщения удаляются из очереди, хотя я их не АSKаю, использую метод BasicReject(ID).
После выполнения данного кода, все сообщения удаляются из очереди, хотя необходимо, чтобы они вернулись назад в очередь.
Пример кода:
Клиент = PinkRabbitMQ_Подключиться();
ИмяОчереди = "q.site.1c.test";
Попытка
//Клиент.DeclareQueue(ИмяОчереди, Ложь, Истина, Ложь, Ложь);
Потребитель = Клиент.BasicConsume(ИмяОчереди, "", Ложь, Ложь, 1);
Сообщение = ""; //Первоначально задаем параметры
ID = 0; //Первоначально задаем параметры
Пока Клиент.BasicConsumeMessage("", Сообщение, ID, 5000) Цикл
// Начало Обработчика сообщения
Сообщить("Успешно! Из очереди прочитано сообщение " + Сообщение);
// Конец Обработчика сообщения
Клиент.BasicReject(ID);
Сообщение = ""; // Обнуляем, чтобы избежать утечку памяти
ID = 0; // Обнуляем, чтобы избежать утечку памяти
КонецЦикла;
Клиент.BasicCancel("");
Исключение
Сообщить(Клиент.GetLastError());
КонецПопытки;
Доработать компоненту так, чтобы при получении сообщений она слушала несколько очередей и тогда можно было бы снять ограничение 1 сеанс 1С - одна очередь из которой читаются данные.
Подробное описание проблемы см. https://partners.v8.1c.ru/forum/message/1846350
Текущая проблема которую решаем:
" Если нужно забирать данные из нескольких источников (очередей) то нужно несколько регл. заданий. Т.е. при таком подходе регл. заданий должно быть столько же, сколько очередей. А это может приводить к проблемам (например когда 10 очередей, в эти 10 очередей одновременно начинают приходить сообщения, то 10 регл. заданий, при загрузке этих сообщений одномоментно, съедают 10 ядер процессора, и на это повлиять никак нельзя)"
В последней версии часто получаем странные ошибки вида
AMQP server timeout error
Например, вызываю Connect
, потом сразу DeclareExchange
- и с вероятностью ~20% получаю ошибку выше.
При этом если после Connect
сделать паузу хотя бы в 500 мс, то ошибки нет.
На версии 1.10 все хорошо в тех же условиях.
Клиент = Новый("AddIn.BITERP.PinkRabbitMQ");
TimeOut = 50000;
Попытка
Клиент.Connect(RabbitMQServerURL, RabbitMQPort, RabbitMQLogin, RabbitMQPassword, RabbitMQVHost,,,10);
Клиент.DeclareQueue(ИмяОчереди, Ложь, Истина, Ложь, Ложь);
Потребитель = Клиент.BasicConsume(ИмяОчереди, "", Ложь, Ложь, 0);
Пока Клиент.BasicConsumeMessage("", ОтветноеСообщение, ТегСообщения, TimeOut) Цикл //Тут зависает ровно на время таймаута вне зависимости от размера сообщения
Клиент.BasicAck(ТегСообщения);
ОтветноеСообщение = ""; // Обнуляем, чтобы избежать утечку памяти
ТегСообщения = 0; // Обнуляем, чтобы избежать утечку памяти
КонецЦикла;
Клиент.BasicCancel("");
При вызове метода BasicConsumeMessage время ожидания сообщения ровняется таймауту, вне зависимости от размера сообщения.
PinkRabbitMQ v2.1.1.132
Версия компоненты 1.8.
Пример выполняемого кода.
Потребитель = Клиент.BasicConsume(Параметры.Очередь, "", Истина, Ложь, 0);
// После этого вызова Потребитель = ""
// Второй параметр не влияет на дальнейший результат.
Пока Клиент.BasicConsumeMessage(Потребитель, ОтветноеСообщение, ТегСообщения, 1) Цикл
// В цикл не заходит.
ОтветноеСообщение = ""; // Обнуляем, чтобы избежать утечку памяти
ТегСообщения = 0; // Обнуляем, чтобы избежать утечку памяти
КонецЦикла;
При попытке собрать PinkRabbitMQWindows (например Release - x64) в VS (Сборка -> Собрать решение) возникают ошибки:
1>C:\Distr\PinkRabbitMQ\PinkRabbitMQWindows\NativeAPI\RabbitMQClient.cpp(6,10): fatal error C1083: Не удается открыть файл включение: Poco/Net/NetException.h: No such file or directory
1>C:\Distr\PinkRabbitMQ\PinkRabbitMQWindows\NativeAPI\SimplePocoHandler.cpp(7,10): fatal error C1083: Не удается открыть файл включение: Poco/Net/StreamSocket.h: No such file or directory
Для исправления ошибки потребовалось скачать https://github.com/pocoproject/poco/archive/poco-1.9.0-release.zip и скопировать оттуда каталог Net в PinkRabbitMQWindows\libs\poco-poco-1.9.0-release\Net.
З.Ы. Сначала скачал poco 1.10.1 (крайний) и долго не мог понять, почему у меня компонента крашится на методе connect...
Необходимо подключиться к кролику и ожидать сообщения.
Клиент.Connect("127.0.0.1", 5672, "user", "password", "vhost");
Потребитель = Клиент.BasicConsume(ИмяОчереди, "", Истина, Ложь, 0);
Но примерно через 3 минуты соединение разрывается.
Похожая проблема описана здесь и дано решение.
Connection lost error after ~3 minutes.
Как добавить обработку onHeartbeat ?
virtual void onHeartbeat(AMQP::TcpConnection *connection) {
std::cout << "heartbeat" << std::endl;
connection->heartbeat();
}
P.S. Не силен в c++, поэтому обращаюсь к сообществу :)
Здравствуйте.
Реализуйте пожалуйста опцию noConfirm в методе BasicConsume чтобы можно было повторно получать сообщения.
В данный момент метод basicReject не оставляет сообщение в кролике.
// извините, моя ошибка
bool RabbitMQClient::basicConsumeMessage(std::string& outdata, std::uint64_t& outMessageTag, uint16_t timeout) {
updateLastError("");
std::chrono::milliseconds timeoutSec{ timeout };
auto end = std::chrono::system_clock::now() + timeoutSec;
while (!readQueue.empty() || (end - std::chrono::system_clock::now()).count() > 0) {
Последняя строка организует бесконечный цикл с проверкой условий до момента истечения таймаута или появления сообщений в очереди. Это приводит к 100% загрузки одного процессорного ядра. Рекомендуется заменить данную конструкцию на конструкцию "wait_until". Более подробно можно почитать здесь: https://en.cppreference.com/w/cpp/thread/condition_variable/wait_until
Замена приведет к удалению паразитной загрузки процессора.
Возник такой вопрос по работе кода, приведенного в README.md:
Пока Клиент.BasicConsumeMessage("", ОтветноеСообщение, ТегСообщения, 5000) Цикл
Клиент.BasicAck(ТегСообщения);
Сообщить("Успешно! Из очереди прочитано сообщение " + ОтветноеСообщение);
ОтветноеСообщение = ""; // Обнуляем, чтобы избежать утечку памяти
ТегСообщения = 0; // Обнуляем, чтобы избежать утечку памяти
КонецЦикла;
Правильно я понимаю, схему работы с компонентой:
ВОПРОСЫ:
хотим использовать вашу компоненту (а может и сам адаптер) в проекте, но пока нет уверенности, что компонента работает корректно и (самое главное) оптимально.
Описание:
Если запустить последовательно в рамках одной сессии 2 цикла чтения сообщения, то второй цикл не прочитает ни одного сообщения, даже если они есть в очереди
Способ обхода:
Заново инициализировать компоненту при повторном чтении
Псевдокод:
Процедура ПрочитатьСообщениеКлиентСервер(КлиентКомпоненты, Форма)
Попытка
КлиентКомпоненты.Connect(
Форма.Адрес,
Форма.Порт,
Форма.Логин,
Форма.Пароль,
Форма.ВиртуальныйХост);
ИмяОчереди = Форма.ИмяОчереди;
ТегСообщения = 0;
Попытка
КлиентКомпоненты.DeclareQueue(ИмяОчереди, Ложь, Ложь, Ложь, Ложь);
Потребитель = КлиентКомпоненты.BasicConsume(ИмяОчереди, "", Истина, Ложь, 0);
ОтветноеСообщение = "";
//Если КлиентКомпоненты.BasicConsumeMessage("", ОтветноеСообщение, 5) Тогда
Если КлиентКомпоненты.BasicConsumeMessage("", ОтветноеСообщение, ТегСообщения,1000) Тогда
КлиентКомпоненты.BasicAck(ТегСообщения);
Форма.ОтветноеСообщение = ОтветноеСообщение;
ТекстСообщения = НСтр("ru='Сообщение успешно прочитано!'");
Иначе
Форма.ОтветноеСообщение = ОтветноеСообщение;
ТекстСообщения = НСтр("ru='Очередь пустая!'");
КонецЕсли;
Сообщить(ТекстСообщения);
// Обнуляем, чтобы избежать утечку памяти
ОтветноеСообщение = "";
ТегСообщения = 0;
КлиентКомпоненты.BasicCancel("");
Исключение
ВызватьИсключение КлиентКомпоненты.GetLastError();
КонецПопытки;
Исключение
СистемнаяОшибка = ОписаниеОшибки();
ТекстСообщения = "Ошибка чтения сообщения!%СистемнаяОшибка%";
ТекстСообщения = СтрЗаменить(ТекстСообщения, "%СистемнаяОшибка%", СистемнаяОшибка);
ВызватьИсключение ТекстСообщения;
КонецПопытки;
КонецПроцедуры
Столкнулся с рядом проблем при попытке использования компоненты:
Если делать так
Клиент = Новый("AddIn.BITERP.PinkRabbitMQ");
ВыходнойПараметр = "";
Попытка
Пока Клиент.BasicConsumeMessage(Потребитель, ВыходнойПараметр, Таймаут) Цикл
// Работа с компонентой
Если Клиент.CorrelationId <> "МОЙ_ИД" Тогда
ВызватьИсключение "Ошибка чтения свойств!"; // ТАК ДЕЛАТЬ СТРОГО НЕ РЕКОМЕНДУЕТСЯ!!!
КонецЕсли;
ВыходнойПараметр = "";
КонецЦикла;
Исключение
Клиент = неопределено;
КонецПопытки
Это будет корректно? Вызывать исключение в попытке?
При получении сообщения метод GetPriority возвращает неопределено.
Если получить сообщение через веб интерфейс, свойства сообщения:
Properties
priority: 8
delivery_mode: 2
ошибся)
В файле /PinkRabbitMQWindows/NativeAPI/CAddInNative.cpp есть такой код
std::string outdata;
...
setWStringToTVariant(&paParams[1], Utils::stringToWs(outdata).c_str());
...
В методе Utils::stringToWs реализовано такое преобразование из string в широкую строку wstring:
std::wstring Utils::stringToWs(const std::string& s)
{
std::wstring_convert<std::codecvt_utf8<wchar_t>> conv;
return conv.from_bytes(s);
}
Если строка s будет содержать некоторые спец символы unicode, например такие как неразрывный пробел (U+00A0), то это вызовет ошибку преобразования и исходная строка в лучшем случае получится обрезанной. Может будет более безопаснее использовать проверенную?
boost::locale::conv::utf_to_utf<wchar_t>(s);
Привет! То что на картинке это баг или фича? Никита Грызлов в чате написал что возмоджно это баг вот тут https://t.me/ssl1c/86188 Подробное обсуждение было там же в чате, но могу и предоставить доп. информацию если потребуется
Например, для задач телефонии Asterisk AMI + RabbitMQ + компонента + 1С:
В 1С нельзя вызвать клиентский метод с сервера. Но компонента может работать на клиенте и на сервере.
В частности, на сервере можно:
На клиенте можно:
Идея для использования на клиенте:
Насколько я знаю, это будет работать только на клиенте - на сервере "внешние события не обрабатываются".
При сборке проекта "PinkRabbitMQLinux" возникает ошибка:
Ошибка openssl/ssl.h: No such file or directory
19 | #include <openssl/ssl.h>
| ^~~~~~~~~~~~~~~
compilation terminated. PinkRabbitMQLinux
\PinkRabbitMQLinux\src\amqp\include\amqpcpp\linux_tcp\tcpparent.h
Попытка сборки была под Win 10 с помощью Visual Studio 2019 + WSL.
Какую версию библиотеки openssl нужно добавить?
Можно ли это сделать непосредственно в репозитории?
Доброго дня. Собрал компоненту на Ubuntu 18.04 файл .so сформировался, хотя в тест выдал такую ошибку:
libPinkRabbitMQ64.so: undefined reference to `AMQP::Field::operator std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&() const'
collect2: error: ld returned 1 exit status
CMakeFiles/unittest.dir/build.make:125: recipe for target 'unittest' failed
make[2]: *** [unittest] Error 1
CMakeFiles/Makefile2:104: recipe for target 'CMakeFiles/unittest.dir/all' failed
make[1]: *** [CMakeFiles/unittest.dir/all] Error 2
Makefile:94: recipe for target 'all' failed
make: *** [all] Error 2
собранный файл .so включил в MANIFEST.zip и засунул в конфу. При вызове функции возвращается "Ложь", то есть компонента отказывается подключаться:
КомпонентаПодключена = ПодключитьВнешнююКомпоненту(
АдресВоВременномХранилище,
"BITERP",
ТипВнешнейКомпоненты.Native
);
Собирал так как это сказано в ридми. Действительно ли компоненту можно собрать только на Ubuntu 16 и это не рекомендация, а жесткое требование?
Добрый день!
Компонента сейчас предоставляет доступ к свойствам сообщения (MessageId, ContentType и пр.), но не к Headers. Можно ли добавить такую возможность?
Это было бы полезно, например, при приеме сообщений MQTT через Rabbit.
Перестала работать сборка компоненты linux через sln проект
Дано:
Компонента версии 1.8
Ubuntu 18.04, платформа 1С 8.3.13-16
При вызове метода Connect("server", "5672", "admin", "1234", "/")
падает рабочий процесс сервера 1С.
Ожидаемое поведение:
Как и в Windows, должно выводиться сообщение о том, что в параметре 1 передано значение неправильного типа. В примере "5672" - это Строка, а должно быть 5672 - Число.
Поправьте README.md
Пункт «Создание очереди, отправка и получение сообщения на клиенте»
Если вы хотите показать, что Клиент и Потребитель существуют отдельно, то разнесите код на несколько Попыток.
Подробности см. https://partners.v8.1c.ru/forum/t/1846210/m/1846518
В RabbitMQ есть набор свойств, передаваемых вместе с сообщением:
appID
messageID
typeName
correlationID
ContentEncoding
ContentType
UserID
ClusterID
Expiration
ReplyTo
Нужно добавить эти свойства в компоненту,, чтобы можно было их записывать / читать при обмене сообшениями
2020-04-19 07:00:03.037 [warning] <0.794.427> closing AMQP connection <0.794.427>..................
client unexpectedly closed TCP connection
Здравствуйте.
Не очень в плюсах, но в линуксе данный цикл грузит ядро на 100%
while(!thiz->connection->closed()) {
event_base_loop(loop, EVLOOP_NONBLOCK);
}
См документацию https://www.rabbitmq.com/priority.html
Если подключении компоненты и отправке в цикле сообщений, когда возникает ошибка от компоненты "AMQP server timeout error", то при КлиентКомпоненты = Неопределено, происходит перезапуск всего действия. Например, если выгрузка начата по кнопке - весь алгоритм команды кнопки начинается заново и, соответственно начинается вся выгрузка заново.
Процедура ОтправкаСообщений(ПараметрыПодключения, ТЗ)
Попытка
КлиентКомпоненты = ПодключитьсяКRabbit(ПараметрыПодключения);
Исключение
ВызватьИсключение ОписаниеОшибки();
КонецПопытки;
Для каждого стр из ТЗ Цикл
КоличествоПопыток = 1;
Пока КоличествоПопыток <= 3 Цикл
Попытка
ОтправитьСообщениеRabbit(ПараметрыПодключения, КлиентКомпоненты, стр.ТекстСообщения);
Исключение
ТекстОшибки = ОписаниеОшибки(); // Из GetLastError возвращается "AMQP server timeout error"
Сообщить(ТекстОшибки);
КоличествоПопыток = КоличествоПопыток + 1;
Продолжить;
КонецПопытки;
КонецЦикла;
Если НЕ КоличествоПопыток <= 3 Тогда
КлиентКомпоненты = Неопределено; // Если не получилось переотправить, после этой строки начинается алгоритм заново
ВызватьИсключение ТекстОшибки;
КонецЕсли;
КонецЦикла;
КлиентКомпоненты = Неопределено; // Если получилось переотправить, начинается алгоритм заново
КонецПроцедуры
Метод Клиент.GetLastError() возвращает frame size exceeded, что это значит?
Возникает когда канал на чтение еще не закрыт, пытаемся отправить какое то сообщение в кролика.
Отправляю сообщение. Размер сообщения 17.1 Мб.
Исключения не происходит.
Но в очереди сообщения не появляется.
Уменьшил сообщение до 10 Мб, отправилось и появилось.
Есть какие-то ограничения по размеру сообщения? Или в методе зашит таймаут ожидания?
Есть предположение, что в linux версии компоненты после вызовыа метода Connect при бездействии (отсутствия трафика) происходит автоматический разрыв соединения. Нужно проверить и при необходимости исправить. Источник: #21
Способ воспроизведения:
Клиент = Новый("AddIn.BITERP.PinkRabbitMQ");
Клиент.Connect("127.0.0.1", 5672, "user", "password", "wrongvhost"); // МЕТОД ПРОХОДИТ ДАЖЕ ЕСЛИ VHOST НЕВЕРНЫЙ
Клиент.DeclareExchange(ИмяТочкиОбмена, "topic", Ложь, Истина, Ложь); // МЕТОД ЗАВИСАЕТ
Проблема связана с тем, что Connect выполняется на низком уровне сокетов, в которм нет API для обработки результата входящего ответа об ошибочной авторизации.
В планах есть релизавать методы по работе по протоколу WebSocket.
Учитавая что все неоходимое уже есть.
Если у пользователя, под которым произведено подключение, нет права публикации сообщений с определенным ключом маршрутизации, то метод BasicPublish не вызывает исключения, просто сообщение не будет опубликовано. Это очень затрудняет поиск проблемы, когда обмены перестают работать
Воспроизводится на версии 1.6. Способ воспроизведения:
Здравствуйте!
Пытаюсь разобраться, как правильно подписываться на очередь. Решаю задачу:
Пока Клиент.BasicConsumeMessage(Потребитель, ТекстСообщения, ТегСообщения, Таймаут) Цикл
// Работа с компонентой
// ...
//
ТекстСообщения = "";
ТегСообщения = 0; // при каждой итерации очищаем память по указателю, который неявной хранится в этой переменной
КонецЦикла;
показан бесконечный цикл чтения для поддержания соединения, поскольку сама компонента после вызова метода Connect() через некоторое время разрывает соединение? И мне нужно проверять ТекстСообщения на непустую строку и выполнять нужные мне действия?
Если да, то на сервере этот цикл следует поместить в ФоновоеЗадание, которое нужно один раз запустить, и пускай работает, верно?
А как на клиенте реализовать подобный цикл, чтобы не блокировать интерфейс? Я пока понял только как ПодключитьОбработчикОжидания, который раз в 10 секунд будет выполнять этот цикл (и выходить из него, если ТекстСообщения = "" после вызова метода BasicConsumeMessage).
Спасибо!
Чтобы можно было проверить работоспособность компоненты при внесении в нее изменений, до пулл-реквеста.
Пока кажется, что это могут быть юнит-тесты, поставляемые вместе с компонентой и интегрированные с гитхаб (травис)
RMQ.BasicPublish(Exchange, RoutingKey, Message, LivingTime, Persist) - Зависает.
есть точка обмена, у нее 2 очереди, из первой читаем нормально по ключу RoutingKey_1, и сразу же отправляем во вторую очередь по ключу RoutingKey_2 метод зависает. Ничего не происходит, приходится ребутить агента
если в цикле в котором читаются данные и канал не закрыт, попытаться отправить сообщение в кролика
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.