sogou / workflow Goto Github PK
View Code? Open in Web Editor NEWC++ Parallel Computing and Asynchronous Networking Framework
License: Apache License 2.0
C++ Parallel Computing and Asynchronous Networking Framework
License: Apache License 2.0
Wrong:
int main(void)
{
...
task->start();
return 0;
}
Right:
int main(void)
{
...
task->start();
pause(); // or better: wait_group.wait();
return 0;
}
项目地址:https://github.com/sogou/srpc
SRPC
是一个工程学角度设计完美的rpc项目,具有以下功能特点:
pb
或thrift
格式的接口描述文件,一键迁移基于thrift
的项目,性能更优。baidu-std
协议,支持thrift framed
协议,与brpc
,apache thrift
互通。支持sogou-std
协议,这个协议同时支持pb
和thrift
格式的接口描述。binary
,http/json
,http/binary
的传输,对业务无感。baidu-std
协议时,性能大多数场景大幅超越原生brpc
。而且更为易用。brpc
了。相同配置的机器,Windows性能优于Linux(iocp
厉害)。SRPC
是workflow上的一套协议。SRPC
优秀的表现也印证了Sogou C++ Workflow
的强大功能。tutorial-04-http_echo_server.cc
服务在接收到不正确的请求时,如接收到 hi\r\n\r\n,会崩溃。
vs2019 x64
HttpMessage 中append_output_body_nocopy 在什么时候释放内存?
和很多的mysql非官方客户端一样,我们目前不支持mysql 8默认的sha2认证方式。解决的方法是修改服务器配置,可以参考这个URL:https://blog.csdn.net/s634772208/article/details/81155068
另外请更新到workflow最新代码。
什么时候会开放windows版本的代码呢?
是否有一个user_guide文档,整体上介绍workflow的整体架构,以及核心的数据结构和API。
现在只看到一些不同场景的tutorial例子,了解起来比较分散
在执行cmake的时候出现如下错误
-- Detecting CXX compile features - done
CMAKE_C_FLAGS_DEBUG is -g
CMAKE_C_FLAGS_RELEASE is -O3 -DNDEBUG
CMAKE_C_FLAGS_RELWITHDEBINFO is -O2 -g -DNDEBUG
CMAKE_C_FLAGS_MINSIZEREL is -Os -DNDEBUG
CMAKE_CXX_FLAGS_DEBUG is -g
CMAKE_CXX_FLAGS_RELEASE is -O3 -DNDEBUG
CMAKE_CXX_FLAGS_RELWITHDEBINFO is -O2 -g -DNDEBUG
CMAKE_CXX_FLAGS_MINSIZEREL is -Os -DNDEBUG
CMake Error at /usr/local/share/cmake-3.8/Modules/FindPackageHandleStandardArgs.cmake:137 (message):
Could NOT find OpenSSL, try to set the path to OpenSSL root folder in the
system variable OPENSSL_ROOT_DIR (missing: OPENSSL_INCLUDE_DIR)
Call Stack (most recent call first):
/usr/local/share/cmake-3.8/Modules/FindPackageHandleStandardArgs.cmake:377 (_FPHSA_FAILURE_MESSAGE)
/usr/local/share/cmake-3.8/Modules/FindOpenSSL.cmake:387 (find_package_handle_standard_args)
src/CMakeLists.txt:3 (find_package)
但是机器上已经安装了openssl,按照cmake ../ -DOPENSSL_ROOT_DIR=/usr/local/Cellar/[email protected]/1.1.1d -DOPENSSL_LIBRARIES=/usr/local/Cellar/[email protected]/1.1.1d/lib来指定执行也还是报这个错误,请教一下
通讯引擎在handler线程被占满,或者用户callback执行时间过长,可能会出现对方断开连接而本地没有及时关闭socket fd,从而导致本地出现CLOSE_WAIT状态的TCP连接。这个问题不会造成严重后果,一般也不可见,但可能影响短连接服务的性能。
出现这个问题的原因是,通讯器里,连接上下文与socket fd的生命周期是一致的,在Communicator::realase_conn()里一起被释放:
void Communicator::release_conn(struct CommConnEntry *entry)
{
delete entry->conn;
if (!entry->service)
pthread_mutex_destroy(&entry->mutex);
if (entry->ssl)
SSL_free(entry->ssl);
close(entry->sockfd);
free(entry);
}
连接上下文类似一个shared_ptr,用户在callback过程中可以引用连接上下文,于是这个上下文至少需要保持到callback结束。在此过程中,连接被对方断开,本地socket fd无法立即关闭。同样,如果系统过于繁忙,没有handler线程处理结果,也会出现这个问题。
解决方案非常明确,我们需要在处理网络事件的poller线程里就关闭socket fd,而不是先把结果放进消息队列等待handler处理。于是我们需要把消息队列从poller里独立出来,poller通过callback的方式返回结果。这么做我们可能还可以减少消息队列的进出次数,对性能有一定帮助。此外,callback方式的poller对于我们实现streaming通讯引擎至关重要。
Workflow is a great framework, respect all your work
Wish you release a stable version for project use your framework and further update
msgqueue(消息队列)和entry对应的任务队列(list封装的)大概的功能是什么那?二者大概是如何配合使用的?
咨询一下,在wget wget_to_redis http_echo_server等tutorial测试中发现,向对等方发送消息均是通过Communicator::send_message_sync中的writev(entry->sockfd, vectors, cnt <= IOV_MAX ? cnt : IOV_MAX)来实现,那epoll.c中的__poller_handle_write的功能是?向对方发送消息时为何不用这个epoll.c中__poller_handle_write?__poller_handle_event和__poller_handle_notify具体的应用场景是什么那?(可以基于具体的场景或者业务谈一下设计时的考虑)
C++ Workflow增加对kafka协议的支持。
具有以下功能特点:
基于C++ Workflow内部的任务流机制实现,高效简洁。
支持kafka常用的消息生产、获取和消费者组等特性。
以插件形式发布,编译时通过make KAFKA=y,生成独立的类库。
搜狗实际业务锤炼,稳定可靠。
例如当收到/stop的请求URL是,http server关闭,方法如下:
#include <string.h>
#include <atomic>
#include “workflow/WFHttpServer.h”
extern void process(WFHttpTask *task);
WFHttpServer server(process);
void process(WFHttpTask *task)
{
if (strcmp(task->get_req()->get_request_uri(), “/stop”) == 0)
{
static std::atomic<int> flag;
if (flag++ == 0)
server.shutdown();
task->get_resp()->append_output_body(“<html>server stop</html>”);
return;
}
/* Server’s logic */
// ....
}
int main()
{
if (server.start(8888) == 0)
server.wait_finish();
return 0;
}
因为在WFServer.h里:
class WFServerBase : protected CommService
{
...
public:
void stop()
{
this->shutdown();
this->wait_finish();
}
void shutdown();
void wait_finish();
};
Server的stop方法无非就是shutdown+wait_finish,上述的方法就是将关停和等待结束分别在两个线程里调用(注意shutdown只能调用一次,因此我们用了原子变量保护),所以肯定是严密的。但显然在process里直接调用stop则是一种错误。
同理,关闭很多个server最好的方法是:
server1.start();
server2.start();
server3.start();
pause(); // or better: wait_group.wait();
server1.shutdown();
server2.shutdown();
server3.shutdown();
server1.wait_finish();
server2.wait_finish();
server3.wait_finish();
Hi, guys,
I want to use the workflow to build a RESTful server which can receiver image file from client and do some AI works. As the title says, how to get multi-part form file from the http request? Can you share a code example?
Thanks!
这个项目很棒,一次编译过,示例代码也能正常运行。
我看了下代码,涉及到 SSL 的地方其实不多,能否改造一下,把这个依赖变为可选项呢?
workflow 内部线程池的设计方式本身是有瓶颈的,其是一种常规的线程池设计模型,但当线程池中线程数较大时会因 pthread_cond_signal 的设计导致惊群问题(具体原因可以 man 一下 pthread_cond_signal 或 https://blog.csdn.net/zsxxsz/article/details/88388425 );
另外,与 nginx 的性能对比可能也存在问题,你们不妨把 nginx 的日志关闭再对比测试一下(我对比测试后发现nginx性能还是远高于workflow的)。nginx 因为其单线程非阻塞的设计方式,其CPU亲和性要远好于线程池的设计方式,其性能基本是随着CPU核数和进程数增加呈线性增长的,而 workflow 中的 demo 在线程数(其中的 pollers 和 handlers)到一定数时就无法再提升了,应该还是线程池中的锁碰撞和 CPU 亲和性等原因);
还有就是nginx的 HTTP 协议解析过程是规范的(从其对url的解析便可以看出),并且功能逻辑也是较为复杂的。
rt
除了头Connection字段,还有什么点需要处理呢?抓包发现基本上每个请求都建立了连接。
请教下,SubTask.cc中任务分发及任务处理中红色框框中大概的含义是什么那?dispath与done均调用subtask_done,看到if条件,但不清楚具体的条件是什么?能否解答下(不论是并行还是串行,任务的分发都是顺序的,只不过分发到不同的request,如dns dispatch到WFRouterTask::dispatch,http task dispatch到WFComplexClientTask::dispatch,进而Communicator::request,但这个过程是通过ParallelTask::dispatch()顺序依次分发的);另外从服务端接收到数据,放进msgqueue,网络线程池触发handle,每次都必须调用subtask_done,最终调用用户注册的回掉函数,如果是基于此,并行处理的主要机制其实就是利用queue的特性,多个线程调用各自的注册的回掉函数;
另外,附加问一个小问题,类似这样的硬编码为1 (node == (struct __poller_node *)1)是什么含义那?
看了tutorial-05-http_proxy.cc例子,中转服务时,需要在process的同个series上添加task,那么process最后回复主调方的时机是什么时候,等添加的task都执行完成?
看例子只有http的,可以作为tcp,udp服务器吗?
When I build the tutorial and try do download sth with wget, it print outs a lot of logs, such as below:
tutorial/wget https://xxxx
#output
HTTP/1.1 200 OK^M
Server: nginx/1.15.6^M
Date: Mon, 28 Dec 2020 01:49:37 GMT^M
Content-Type: application/octet-stream^M
Content-Length: 485985932^M
Last-Modified: Wed, 23 Dec 2020 11:28:39 GMT^M
Connection: close^M
ETag: "5fe329e7-1cf78e8c"^M
Accept-Ranges: bytes^M
^M
^_<8b>^H^@^V!¶]^@^Cì=isã6<96>ùì_<81>u¾Ø^ZÇÝ<92>ÝGº7SEK<94>Í<89>®PT;=©<94><8a><96> <8b>^SJÔð°¬Lå¿ï{^@xß2<9d>ÝTk&-^Ax^O^OïÆ©<85>·Ôß^XÛ<85>é-é<9b><85>·Ün/×ß4ü÷^Vþ®¯¯Ù¿ð^Wÿ÷úí<87>«vû<9b>öÕ»ë«ÎÕuû^C<94>·¯®ß¶¿!o<9b>&$ëÏs\Ý&ä^[۲ܢveõ^?Ñ¿7^SÒ"]kw°<8d>ǵKÚß^?^?õ]çmû^]^Y}Qz<8a>^DUöβu×°¶<97><84>H¦IXC<87>ØÔ¡ö^S]^^B^BÄ1^ZkJW&Ú<98>^LàßÑT<96>?<89>^Zmm8ı<{AÉÂZR¢o<97>o,<9b>,<85>·¡[<97>!'g§^CcA·^N]<92>^^5<8d>'jë^O&uNÏ<89>nSDãx^Oÿ¢^K<97>¸<96>O<9b>±u©iB<99>§<9b>dg[;j»^G<9f>>o»¤6<99>]N/±?D<80>Íí-ë^MÚ<87>c6õ½s^YÐ
Ã"<99><94>^@í@«±<85>â5µ©±%0¬<89>:<9e>¨<8a>¬IêW¿<9b>îxÔWzòHS¤A<84>X¨E<80>^G^@|Db<9f><8c>%`âTºkJ<80>¶<8d>ãã<80>®<96>^F^Rê^PkEt²²ì^M~^R¸^\kåî<81>-Ää<84>^RýѦ^TyI^^^N><8e>^Gêî)ÝFû^Wã¢!³<89>äC^B£A&^TÙi[[c¡<9b>æ^AÑè<8b>^Eݹ@*`öáA^SF<96>»7Ü5X^OP
刚接触,通过现有文档可能有一些问题还不太清楚。
Hi, guys,
I just get a 76-byte libworkflow.so and 11.7M libworkflow.a after I run the make command in the project root folder. How to generate a real dynamic shared library?
Even using the libworkflow.a library, I can't compile my code. The error is below:
XXXXX@XXXXX-HPWS:~/personal/gitee/dl4cpp$ make
g++ -g -std=c++11 -Wall -I./sogou-workflow/include -L./sogou-workflow/lib -l:libworkflow.a src/main.cc -o src/main.o
/tmp/cc1vPAdt.o: In function protocol::HttpMessage::append_output_body(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&)': /home/XXXXX/personal/gitee/dl4cpp/./sogou-workflow/include/HttpMessage.h:159: undefined reference to
protocol::HttpMessage::append_output_body(void const*, unsigned long)'
/tmp/cc1vPAdt.o: In function EncodeStream::~EncodeStream()': /home/XXXXX/personal/gitee/dl4cpp/./sogou-workflow/include/EncodeStream.h:101: undefined reference to
EncodeStream::clear_buffer()'
/tmp/cc1vPAdt.o: In function protocol::RedisValue::~RedisValue()': /home/XXXXX/personal/gitee/dl4cpp/./sogou-workflow/include/RedisMessage.h:228: undefined reference to
protocol::RedisValue::free_data()'
/tmp/cc1vPAdt.o: In function protocol::RedisMessage::~RedisMessage()': /home/XXXXX/personal/gitee/dl4cpp/./sogou-workflow/include/RedisMessage.h:324: undefined reference to
redis_parser_deinit'
/tmp/cc1vPAdt.o: In function WFServerBase::WFServerBase(WFServerParams const*)': /home/XXXXX/personal/gitee/dl4cpp/./sogou-workflow/include/WFServer.h:56: undefined reference to
vtable for WFServerBase'
/tmp/cc1vPAdt.o: In function WFServerBase::start(unsigned short)': /home/XXXXX/personal/gitee/dl4cpp/./sogou-workflow/include/WFServer.h:69: undefined reference to
WFServerBase::start(int, char const*, unsigned short, char const*, char const*)'
/tmp/cc1vPAdt.o: In function WFServerBase::stop()': /home/XXXXX/personal/gitee/dl4cpp/./sogou-workflow/include/WFServer.h:133: undefined reference to
WFServerBase::shutdown()'
/home/XXXXX/personal/gitee/dl4cpp/./sogou-workflow/include/WFServer.h:134: undefined reference to WFServerBase::wait_finish()' /tmp/cc1vPAdt.o: In function
WFServerBase::~WFServerBase()':
/home/XXXXX/personal/gitee/dl4cpp/./sogou-workflow/include/WFServer.h:52: undefined reference to vtable for WFServerBase' /tmp/cc1vPAdt.o: In function
WFServer<protocol::HttpRequest, protocol::HttpResponse>::new_session(long long, CommConnection*)':
/home/XXXXX/personal/gitee/dl4cpp/./sogou-workflow/include/WFHttpServer.h:53: undefined reference to WFServerTaskFactory::create_http_task(std::function<void (WFNetworkTask<protocol::HttpRequest, protocol::HttpResponse>*)>&)' /tmp/cc1vPAdt.o:(.rodata._ZTV8WFServerIN8protocol11HttpRequestENS0_12HttpResponseEE[_ZTV8WFServerIN8protocol11HttpRequestENS0_12HttpResponseEE]+0x20): undefined reference to
WFServerBase::handle_unbound()'
/tmp/cc1vPAdt.o:(.rodata._ZTV8WFServerIN8protocol11HttpRequestENS0_12HttpResponseEE[_ZTV8WFServerIN8protocol11HttpRequestENS0_12HttpResponseEE]+0x28): undefined reference to WFServerBase::create_listen_fd()' /tmp/cc1vPAdt.o:(.rodata._ZTV8WFServerIN8protocol11HttpRequestENS0_12HttpResponseEE[_ZTV8WFServerIN8protocol11HttpRequestENS0_12HttpResponseEE]+0x30): undefined reference to
WFServerBase::new_connection(int)'
/tmp/cc1vPAdt.o:(.rodata._ZTVN8protocol12RedisMessageE[_ZTVN8protocol12RedisMessageE]+0x10): undefined reference to protocol::RedisMessage::encode(iovec*, int)' /tmp/cc1vPAdt.o:(.rodata._ZTVN8protocol12RedisMessageE[_ZTVN8protocol12RedisMessageE]+0x28): undefined reference to
protocol::RedisMessage::append(void const*, unsigned long*)'
/tmp/cc1vPAdt.o:(.rodata._ZTVN8protocol12RedisMessageE[_ZTVN8protocol12RedisMessageE]+0x48): undefined reference to non-virtual thunk to protocol::RedisMessage::append(void const*, unsigned long*)' /tmp/cc1vPAdt.o:(.rodata._ZTI8WFServerIN8protocol11HttpRequestENS0_12HttpResponseEE[_ZTI8WFServerIN8protocol11HttpRequestENS0_12HttpResponseEE]+0x10): undefined reference to
typeinfo for WFServerBase'
collect2: error: ld returned 1 exit status
GNUmakefile:15: recipe for target 'src/main.o' failed
make: *** [src/main.o] Error 1
如题
请问在哪里能看到benchmark的代码,benchmark的链接里只有图,希望能把测试代码放出来。
Communicator.h:28:10: fatal error: 'openssl/ssl.h' file not found
#include <openssl/ssl.h>
我打算实现一个http服务,提供一个接口用来并行获取若干个http url的响应结果并汇总返回。基本上算是合并了tutorial-06-parallel_wget和tutorial-04-http_echo_server的功能。
存在的一点区别是tutorial-06-parallel_wget
只是在ParallelWork
的callback
中往标准输出中打印了每个请求的结果,我的需求是想要处理一下这些抓取结果并汇总作为http 接口的返回。
我想到的一个方案是在ParallelWork的callback里获取到当前http server task(不是很确认正确的获取方式),并设置响应。
void callback(const ParallelWork *pwork)
{
tutorial_series_context *ctx;
const void *body;
size_t size;
size_t i;
// 获取到当前http server task
HttpResponse *resp = task->get_resp();
for (i = 0; i < pwork->size(); i++)
{
ctx = (tutorial_series_context *)pwork->series_at(i)->get_context();
printf("%s\n", ctx->url.c_str());
if (ctx->state == WFT_STATE_SUCCESS)
{
ctx->resp.get_parsed_body(&body, &size);
resp->append_output_body_nocopy(body, size);
}
else
printf("ERROR! state = %d, error = %d\n", ctx->state, ctx->error);
delete ctx;
}
}
int process(WFHttpTask *server_task)
{
ParallelWork *pwork = Workflow::create_parallel_work(callback);
SeriesWork *series;
WFHttpTask *task;
HttpRequest *req;
tutorial_series_context *ctx;
int i;
for (i = 1; i < 100; i++)
{
std::string url = "http://localhost:8100/worker";
task = WFTaskFactory::create_http_task(
url,
REDIRECT_MAX, RETRY_MAX,
[](WFHttpTask *task) {
tutorial_series_context *ctx =
(tutorial_series_context *)series_of(task)->get_context();
ctx->state = task->get_state();
ctx->error = task->get_error();
ctx->resp = std::move(*task->get_resp());
});
req = task->get_req();
req->add_header_pair("Accept", "*/*");
req->add_header_pair("User-Agent", "Wget/1.14 (linux-gnu)");
req->add_header_pair("Connection", "close");
ctx = new tutorial_series_context;
ctx->url = std::move(url);
series = Workflow::create_series_work(task, nullptr);
series->set_context(ctx);
pwork->add_series(series);
}
WFFacilities::WaitGroup wait_group(1);
Workflow::start_series_work(pwork, [&wait_group](const SeriesWork *) {
wait_group.done();
});
wait_group.wait();
return 0;
}
想知道如何在ParallelWork里获取到当前请求的Http server task,或者把http server task设置到ParallelWork的上下文中
在linux下轻松的编译过了, 但是...
我切换到windows分之后, 编译无法通过, 希望有相关的文档可以参考,
修改代码 添加openssl库
[src/CMakeLists.txt :2]
cmake配置: Visual Studio 12 x86 Release
报错如下
[main] 正在生成文件夹: workflow
[build] 正在启动生成
...
...
[build] D:\workflow\src\util\URIParser.cc(351): warning C4996: '_strdup': The POSIX name for this item is deprecated. Instead, use the ISO C and C++ conformant name: _strdup. See online help for details. [D:\workflow\build\src\util\util.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\shared\ws2def.h(235): error C2011: “sockaddr”:“struct”类型重定义 (编译源文件 D:\workflow\src\server\WFServer.cc) [D:\workflow\build\src\server\server.vcxproj]
[build] D:\workflow\src\client\WFMySQLConnection.cc(41): warning C4996: '_strdup': The POSIX name for this item is deprecated. Instead, use the ISO C and C++ conformant name: _strdup. See online help for details. [D:\workflow\build\src\client\client.vcxproj]
[build] D:\workflow_include\workflow\RedisMessage.h(329): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\factory\HttpTaskImpl.cc) [D:\workflow\build\src\factory\factory.vcxproj]
[build] D:\workflow_include\workflow\RedisMessage.h(329): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\factory\MySQLTaskImpl.cc) [D:\workflow\build\src\factory\factory.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\ucrt\string.h(143): note: 参见“_strdup”的声明
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\um\winsock.h(1007): note: 参见“sockaddr”的声明 (编译源文件 D:\workflow\src\server\WFServer.cc)
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\ucrt\string.h(143): note: 参见“_strdup”的声明
[build] D:\workflow\src\util\URIParser.cc(358): warning C4996: '_strdup': The POSIX name for this item is deprecated. Instead, use the ISO C and C++ conformant name: _strdup. See online help for details. [D:\workflow\build\src\util\util.vcxproj]
[build] D:\workflow_include\workflow\HttpMessage.h(62): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\factory\HttpTaskImpl.cc) [D:\workflow\build\src\factory\factory.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\shared\ws2def.h(437): error C2059: 语法错误:“常量” (编译源文件 D:\workflow\src\server\WFServer.cc) [D:\workflow\build\src\server\server.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\shared\ws2def.h(437): error C3805: “常量”: 意外标记,应输入“}”或者“,” (编译源文件 D:\workflow\src\server\WFServer.cc) [D:\workflow\build\src\server\server.vcxproj]
[build] D:\workflow_include\workflow\HttpMessage.h(67): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\factory\HttpTaskImpl.cc) [D:\workflow\build\src\factory\factory.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\shared\ws2def.h(572): warning C4005: “IN_CLASSA”: 宏重定义 (编译源文件 D:\workflow\src\server\WFServer.cc) [D:\workflow\build\src\server\server.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\um\winsock.h(284): note: 参见“IN_CLASSA”的前一个定义 (编译源文件 D:\workflow\src\server\WFServer.cc)
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\ucrt\string.h(143): note: 参见“_strdup”的声明
[build] D:\workflow_include\workflow\HttpMessage.h(62): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\factory\MySQLTaskImpl.cc) [D:\workflow\build\src\factory\factory.vcxproj]
[build] D:\workflow_include\workflow\HttpMessage.h(67): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\factory\MySQLTaskImpl.cc) [D:\workflow\build\src\factory\factory.vcxproj]
[build] D:\workflow_include\workflow\RedisMessage.h(329): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\factory\WFTaskFactory.cc) [D:\workflow\build\src\factory\factory.vcxproj]
[build] D:\workflow_include\workflow\HttpMessage.h(62): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\factory\WFTaskFactory.cc) [D:\workflow\build\src\factory\factory.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\shared\ws2def.h(578): warning C4005: “IN_CLASSB”: 宏重定义 (编译源文件 D:\workflow\src\server\WFServer.cc) [D:\workflow\build\src\server\server.vcxproj]
[build] D:\workflow\src\util\URIParser.cc(365): warning C4996: '_strdup': The POSIX name for this item is deprecated. Instead, use the ISO C and C++ conformant name: _strdup. See online help for details. [D:\workflow\build\src\util\util.vcxproj]
[build] D:\workflow_include\workflow\HttpMessage.h(67): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\factory\WFTaskFactory.cc) [D:\workflow\build\src\factory\factory.vcxproj]
[build] D:\workflow\src\factory\WFTaskFactory.cc(55): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) [D:\workflow\build\src\factory\factory.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\um\winsock.h(290): note: 参见“IN_CLASSB”的前一个定义 (编译源文件 D:\workflow\src\server\WFServer.cc)
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\um\winsock.h(457): note: 参见“AF_IPX”的前一个定义 (编译源文件 D:\workflow\src\manager\RouteManager.cc)
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\shared\ws2def.h(147): warning C4005: “AF_MAX”: 宏重定义 (编译源文件 D:\workflow\src\manager\RouteManager.cc) [D:\workflow\build\src\manager\manager.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\um\winsock.h(476): note: 参见“AF_MAX”的前一个定义 (编译源文件 D:\workflow\src\manager\RouteManager.cc)
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\shared\ws2def.h(185): warning C4005: “SO_DONTLINGER”: 宏重定义 (编译源文件 D:\workflow\src\manager\RouteManager.cc) [D:\workflow\build\src\manager\manager.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\um\winsock.h(399): note: 参见“SO_DONTLINGER”的前一个定义 (编译源文件 D:\workflow\src\manager\RouteManager.cc)
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\shared\ws2def.h(235): error C2011: “sockaddr”:“struct”类型重定义 (编译源文件 D:\workflow\src\manager\RouteManager.cc) [D:\workflow\build\src\manager\manager.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\um\winsock.h(1007): note: 参见“sockaddr”的声明 (编译源
.
.
.
.
.
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\um\winsock.h(1007): note: 参见“sockaddr”的声明 (编译源文件 D:\workflow\src\protocol\HttpUtil.cc)
[build] D:\workflow_include\workflow\Communicator.h(204): error C2227: “->sa_family”的左边必须指向类/结构/联合/泛型类型 (编译源文件 D:\workflow\src\protocol\HttpUtil.cc) [D:\workflow\build\src\protocol\protocol.vcxproj]
[build] d:\workflow\src\protocol\HttpMessage.h(62): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\protocol\HttpUtil.cc) [D:\workflow\build\src\protocol\protocol.vcxproj]
[build] d:\workflow\src\protocol\HttpMessage.h(67): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\protocol\HttpUtil.cc) [D:\workflow\build\src\protocol\protocol.vcxproj]
[build] 生成已完成,退出代码为 1
Hi, I'm playing with workflow + protobuf, here's my example:
the server
#include <workflow/ProtocolMessage.h>
#include <workflow/WFTask.h>
#include <workflow/WFServer.h>
#include <workflow/WFFacilities.h>
#include "proto/common.pb.h"
using namespace std;
namespace protocol {
class OrderMessage : public ProtocolMessage {
public:
trade::Order order;
private:
int encode(struct iovec *vectors, int max) override {
int size = order.ByteSizeLong();
void *buffer = new char[size];
order.SerializePartialToArray(buffer, size);
vectors[0].iov_base = buffer;
vectors[0].iov_len = size;
return 1;
}
int append(const void *buf, size_t *size) override {
order.ParsePartialFromArray(buf, size[0]);
return 1;
}
};
}
using WFTestTask = WFNetworkTask<protocol::OrderMessage, protocol::OrderMessage>;
void process(WFTestTask *task)
{
protocol::OrderMessage *req = task->get_req();
protocol::OrderMessage *resp = task->get_resp();
cout << req->order.price() << endl;
resp->order.set_price("123");
}
static WFFacilities::WaitGroup wait_group(1);
int main()
{
using WFTestServer = WFServer<protocol::OrderMessage, protocol::OrderMessage>;
WFServerParams params = SERVER_PARAMS_DEFAULT;
WFTestServer server(¶ms, process);
if (server.start(AF_INET, 8080) == 0) {
cout << "success!" << endl;
wait_group.wait();
server.stop();
} else {
}
return 0;
}
the client
using WFTestTask = WFNetworkTask<protocol::OrderMessage, protocol::OrderMessage>;
using tutorial_callback_t = std::function<void (WFTestTask *)>;
class MyFactory : public WFTaskFactory
{
public:
static WFTestTask *create_tutorial_task(const std::string& host,
unsigned short port,
int retry_max,
tutorial_callback_t callback)
{
using NTF = WFNetworkTaskFactory<protocol::OrderMessage, protocol::OrderMessage>;
WFTestTask *task = NTF::create_client_task(TT_TCP, host, port,
retry_max,
std::move(callback));
task->set_keep_alive(30 * 1000);
return task;
}
};
int main()
{
std::string host = "0.0.0.0";
int port = 8080;
std::function<void (WFTestTask *task)> callback =
[&host, port, &callback](WFTestTask *task) {
int state = task->get_state();
int error = task->get_error();
cout << state << " " << error << endl;
if (state != WFT_STATE_SUCCESS)
{
if (state == WFT_STATE_SYS_ERROR)
fprintf(stderr, "SYS error: %s\n", strerror(error));
else if (state == WFT_STATE_DNS_ERROR)
fprintf(stderr, "DNS error: %s\n", gai_strerror(error));
else
fprintf(stderr, "other error.\n");
return;
}
protocol::OrderMessage *resp = task->get_resp();
cout << resp->order.price() << endl;
};
WFFacilities::WaitGroup wait_group(1);
WFTestTask *task = MyFactory::create_tutorial_task(host, port, 0, callback);
auto req = task->get_req();
req->order.set_price("333");
task->start();
}
and I got the no reply error (namely state = 2, error = 0) on the client side. Any suggestion? Thanks !
第一类问题:WFGraphNode是DAG 其中的一个节点(node),但为什么是基于WFCounterTask类实现的(node和WFCounterTask难道有什么关系吗);WFContainerTask也是基于WFCounterTask实现的,能否也解释下?多入边的节点都需要一个counter实现,这是有什么历史背景吗?还是有和渊源?我理解一个DAG 包含多个node(每个node其实就是一个任务),但是不清楚workflow中基于WFCounterTask类实现的node是基于何种场景考虑的?(node为什么基于WFCounterTask实现?)
第二类问题(仅仅是个人建议):在具体的task实现中,FileIOTask等类似的任务实现是在WFTaskFactory.cc中实现的,能否可以像HttpTaskInpl.cc似的,单独弄一个文件完成FileIOTask的实现,其他任务实现也类似这么做(目前实现了http,redis等常规的)(这样的好处是比较容易理解,通过文件了解整体框架脉络(一层一层继承),比较清楚);
task之间如何顺序传递数据?task1的输出如何自然的(最好还是免复制的)传递给task2的输入?
Workflow需要依赖OpenSSL(推荐1.1及以上版本)和Cmake(要求3.6以上版本),以下为安装步骤:
安装 OpenSSL
brew install openssl
安装 CMake
brew install cmake
指定 OpenSSL
环境变量
由于MacOS下默认有LibreSSL,因此在brew安装后,并不会自动建软链,我们需要手动把执行路径、编译路径、cmake时的find_package路径都配置到bash的环境变量中。用户可以执行brew info openssl
查看相关信息,也可以如下配置:
echo 'export PATH="/usr/local/opt/[email protected]/bin:$PATH"' >> ~/.bash_profile
echo 'export LDFLAGS="-L/usr/local/opt/[email protected]/lib"' >> ~/.bash_profile
echo 'export CPPFLAGS="-I/usr/local/opt/[email protected]/include"' >> ~/.bash_profile
echo 'export PKG_CONFIG_PATH="/usr/local/opt/[email protected]/lib/pkgconfig"' >> ~/.bash_profile
echo 'export OPENSSL_ROOT_DIR=/usr/local/opt/openssl' >> ~/.bash_profile
echo 'export OPENSSL_LIBRARIES=/usr/local/opt/openssl/lib' >> ~/.bash_profile
如果使用zsh,则还需要以下一步,把bash的配置加载一下:
echo 'test -f ~/.bash_profile && source ~/.bash_profile' >> ~/.zshrc
source ~/.zshrc
希望能增加一个示例:中转服务,收到请求后,并行调用多个下游服务,然后将每个下游服务的回复转发回主调方。
Install OpenSSL
brew install openssl
Install CMake
brew install cmake
Build with OpenSSL
mkdir build && cd build
cmake -DOPENSSL_ROOT_DIR=/usr/local/opt/openssl ..
make -j8
Hi, I got the following build errors on mac, any suggestion? Thanks!
[ 33%] Linking CXX executable test
Undefined symbols for architecture x86_64:
"_BIO_free", referenced from:
Communicator::handle_listen_result(poller_result*) in libworkflow.a(Communicator.cc.o)
Communicator::handle_connect_result(poller_result*) in libworkflow.a(Communicator.cc.o)
"_BIO_new_socket", referenced from:
Communicator::handle_listen_result(poller_result*) in libworkflow.a(Communicator.cc.o)
Communicator::handle_connect_result(poller_result*) in libworkflow.a(Communicator.cc.o)
"_CONF_modules_unload", referenced from:
__SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
"_CRYPTO_THREADID_set_callback", referenced from:
__SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
"_CRYPTO_cleanup_all_ex_data", referenced from:
__SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
"_CRYPTO_num_locks", referenced from:
__SSLManager::__SSLManager() in libworkflow.a(WFGlobal.cc.o)
"_CRYPTO_set_locking_callback", referenced from:
__SSLManager::__SSLManager() in libworkflow.a(WFGlobal.cc.o)
__SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
"_ENGINE_cleanup", referenced from:
__SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
"_ERR_free_strings", referenced from:
__SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
"_ERR_remove_thread_state", referenced from:
__SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
___poller_thread_routine in libworkflow.a(poller.c.o)
"_EVP_cleanup", referenced from:
__SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
"_FIPS_mode_set", referenced from:
__SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
"_MD5_Final", referenced from:
MD5Util::md5_bin(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
MD5Util::md5_string_32(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
MD5Util::md5_string_16(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
MD5Util::md5_integer_32(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
MD5Util::md5_integer_16(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
"_MD5_Init", referenced from:
MD5Util::md5_bin(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
MD5Util::md5_string_32(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
MD5Util::md5_string_16(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
MD5Util::md5_integer_32(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
MD5Util::md5_integer_16(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
"_MD5_Update", referenced from:
MD5Util::md5_bin(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
MD5Util::md5_string_32(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
MD5Util::md5_string_16(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
MD5Util::md5_integer_32(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
MD5Util::md5_integer_16(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
"_SSL_COMP_get_compression_methods", referenced from:
__SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
"_SSL_CTX_free", referenced from:
__SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
"_SSL_CTX_new", referenced from:
__SSLManager::__SSLManager() in libworkflow.a(WFGlobal.cc.o)
"_SSL_CTX_set_verify", referenced from:
WFServerBase::init(sockaddr const*, unsigned int, char const*, char const*) in libworkflow.a(WFServer.cc.o)
"_SSL_CTX_use_PrivateKey_file", referenced from:
WFServerBase::init(sockaddr const*, unsigned int, char const*, char const*) in libworkflow.a(WFServer.cc.o)
"_SSL_CTX_use_certificate_file", referenced from:
WFServerBase::init(sockaddr const*, unsigned int, char const*, char const*) in libworkflow.a(WFServer.cc.o)
"_SSL_accept", referenced from:
___poller_thread_routine in libworkflow.a(poller.c.o)
"_SSL_connect", referenced from:
___poller_thread_routine in libworkflow.a(poller.c.o)
"_SSL_free", referenced from:
Communicator::release_conn(CommConnEntry*) in libworkflow.a(Communicator.cc.o)
Communicator::handle_incoming_request(poller_result*) in libworkflow.a(Communicator.cc.o)
Communicator::handle_incoming_reply(poller_result*) in libworkflow.a(Communicator.cc.o)
Communicator::handle_reply_result(poller_result*) in libworkflow.a(Communicator.cc.o)
Communicator::handle_request_result(poller_result*) in libworkflow.a(Communicator.cc.o)
Communicator::handle_listen_result(poller_result*) in libworkflow.a(Communicator.cc.o)
Communicator::handle_connect_result(poller_result*) in libworkflow.a(Communicator.cc.o)
...
"_SSL_get_error", referenced from:
CommMessageIn::feedback(char const*, unsigned long) in libworkflow.a(Communicator.cc.o)
___poller_handle_ssl_error in libworkflow.a(poller.c.o)
"_SSL_library_init", referenced from:
__SSLManager::__SSLManager() in libworkflow.a(WFGlobal.cc.o)
"_SSL_load_error_strings", referenced from:
__SSLManager::__SSLManager() in libworkflow.a(WFGlobal.cc.o)
"_SSL_new", referenced from:
Communicator::handle_listen_result(poller_result*) in libworkflow.a(Communicator.cc.o)
Communicator::handle_connect_result(poller_result*) in libworkflow.a(Communicator.cc.o)
"_SSL_read", referenced from:
___poller_thread_routine in libworkflow.a(poller.c.o)
"_SSL_set_bio", referenced from:
Communicator::handle_listen_result(poller_result*) in libworkflow.a(Communicator.cc.o)
Communicator::handle_connect_result(poller_result*) in libworkflow.a(Communicator.cc.o)
"_SSL_shutdown", referenced from:
___poller_thread_routine in libworkflow.a(poller.c.o)
"_SSL_write", referenced from:
CommMessageIn::feedback(char const*, unsigned long) in libworkflow.a(Communicator.cc.o)
___poller_thread_routine in libworkflow.a(poller.c.o)
"_SSLv23_client_method", referenced from:
__SSLManager::__SSLManager() in libworkflow.a(WFGlobal.cc.o)
"_SSLv23_server_method", referenced from:
__SSLManager::__SSLManager() in libworkflow.a(WFGlobal.cc.o)
"_sk_free", referenced from:
__SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
ld: symbol(s) not found for architecture x86_64
clang: error: linker command failed with exit code 1 (use -v to see invocation)
make[3]: *** [test] Error 1
make[2]: *** [CMakeFiles/test.dir/all] Error 2
make[1]: *** [CMakeFiles/test.dir/rule] Error 2
make: *** [test] Error 2
my CMakeLists.txt:
cmake_minimum_required(VERSION 3.13)
set(CMAKE_CXX_STANDARD 20)
find_package(WorkFlow REQUIRED)
add_executable(test test.cpp proto/common.pb.cc)
target_link_libraries(test -lworkflow -lprotobuf)
1.创建mysql任务执行数据库的插入,插入记录成功,但是会出现Abnormal packet_type=1。想请教下一般可能的原因是什么,控制台输出来自
workflow/tutorial/tutorial-12-mysql_cli.cc
Lines 203 to 206 in 6ffb7e0
2.对于需要插入多条记录的任务,是分别每条记录建立一个task比较好呢还是建立一个task执行多条insert语句好呢。
数据库小白,问的问题可能比较zz ==!
C++ Workflow项目起源于搜狗公司的分布式存储项目的通讯引擎,并且发展成为搜狗公司级C++标准,应用于搜狗大多数C++后端服务。项目将通讯与计算和谐统一,帮助用户建立通讯与计算关系非常复杂的高性能服务。但同时用户也可以只把它当成简易的异步网络引擎或并行计算框架来使用。
以Linux系统为例:
$ git clone https://github.com/sogou/workflow
$ cd workflow
$ make
$ cd tutorial
$ make
然后就可以愉快的运行示例了。每个示例都有对应的文档讲解。如果需要用到kafka协议,请预先安装snappy和lz4,并且:
$ make KAFKA=y
$ cd tutorial
$ make KAFKA=y
另外,make DEBUG=y
,可以编译调试版。通过make REDIS=n MYSQL=n UPSTREAM=n CONSUL=n
可以裁剪掉一个或多个功能,让库文件减小到最低400KB,更加适合嵌入式开发。
目前我们实现了HTTP,Redis,MySQL和kafka协议。除kafka目前只支持客户端以外,其他协议都是client+server。也就是说,用户可以用于构建Redis或MySQL协议的代理服务器。kafka模块是插件,默认不编译。
我们用C++11 std::function类型的callback和process来包装用户行为,因此用户需要知道自己是在编写异步程序。我们认为callback方式比future或用户态协程能给程序带来更高的效率,并且能很好的实现通信与计算的统一。由于我们的任务封装方式以及std::function带来的便利,在我们的框架里使用callback并没有太多心智负担,反而非常简单明了。
项目的一个特点是由框架来管理线程,除了一些很特殊情况,callback的调用线程必然是处理网络收发和文件IO结果的handler线程(默认数量20)或者计算线程(默认数量等于CPU总核数)。但无论在哪个线程里执行,都不建议在callback里等待或执行特别复杂的计算。需要等待可以用counter任务进行不占线程的wait,复杂计算则应该包装成计算任务。
需要说明的是,框架里的一切资源都是使用时分配。如果用户没有用到网络通信,那么所有和通信相关的线程都不会被创建。
int main(void)
{
...
task->start();
return 0;
}
这是很多新用户都会遇到的问题。框架中几乎所有调用都是非阻塞的,上面的代码在task启动之后main函数立刻return,并不会等待task的执行结束。正确的做法应该是通过某种方式在唤醒主进程,例如:
WFFaciliies::WaitGroup wait_group(1);
void callback(WFHttpTask *task)
{
....
wait_group.done();
}
int main(void)
{
WFHttpTask *task = WFTaskFactory::create_http_task(url, 0, 0, callback);
task->start();
wait_group.wait();
return 0;
}
框架中任何任务(以及SeriesWork),都是以裸指针形式交给用户。所有任务对象的生命周期,是从对象被创建,到对象的callback完成。也就是说callback之后task指针也就失效了,同时被销毁的也包括task里的数据。如果你需要保留数据,可以用std::move()把数据移走,例如我们需要保留http任务中的resp:
void http_callback(WFHttpTask *task)
{
protocol::HttpResponse *resp = task->get_resp();
protocol::HttpResponse *my_resp = new protocol::HttpResponse(std::move(*resp));
/* or
protocol::HttpResponse *my_resp = new protocol::HttpResponse;
*my_resp = std::move(*resp);
*/
}
某些情况下,如果用户创建完任务又不想启动了,那么需要调用task->dismiss()直接销毁任务。
需要特别强调,server的process函数不是callback,server任务的callback发生在回复完成之后,而且默认为nullptr。
我们关于串并联的定义是:
显然通过这三句话的定义我们可以递归出任意复杂的串并联结构。如果把串行也定义为一种任务,串行就可以由多个子串行组成,那么使用起来就很容易陷入混乱。同样并行只能是若干串行的并,也是为了避免混乱。其实使用中你会发现,串行本质上就是我们的协程。
可以使用WFGraphTask,或自己用WFCounterTask来构造。
示例:https://github.com/sogou/workflow/blob/master/tutorial/tutorial-11-graph_task.cc
不是。server是在server task所在series没有别的任务之后回复请求。如果你不向这个series里添加任何任务,就相当于process结束之后回复。注意不要在process里等待任务的完成,而应该把这个任务添加到series里。
错误的方法是在process里直接sleep。正确做法,向server所在的series里添加一个timer任务。以http server为例:
void process(WFHttpTask *server_task)
{
WFTimerTask *timer = WFTaskFactory::create_timer_task(100000, nullptr);
server_task->get_resp()->append_output_body("hello");
series_of(server_task)->push_back(timer);
}
以上代码实现一个100毫秒延迟的http server。一切都是异步执行,等待过程没有线程被占用。
首先回复成功的定义是成功把数据写入tcp缓冲,所以如果回复包很小而且client端没有因为超时等原因关闭了连接,几乎可以认为一定回复成功。需要查看回复结果,只需给server task设置一个callback,callback里状态码和错误码的定义与client task是一样的,但server task不会出现dns错误。
可以。任何时候调用server task的noreply()方法,那么在原本回复的时机,连接直接关闭。
我们发现包括WFGoTask在内的所有计算任务,在创建时都需要指定一个计算队列名,这个计算队列名可用于指导我们内部的调度策略。首先,只要有空闲计算线程可用,任务将实时调起,计算队列名不起作用。当计算线程无法实时调起每个任务的时候,那么同一队列名下的任务将按FIFO的顺序被调起,而队列与队列之间则是平等对待。例如,先连续启动n个队列名为A的任务,再连续启动n个队列名为B的任务。那么无论每个任务的cpu耗时分别是多少,也无论计算线程数多少,这两个队列将近倾向于同时执行完毕。这个规律可以扩展到任意队列数量以及任意启动顺序。
首先看一下redis client任务的创建接口:
class WFTaskFactory
{
public:
WFRedisTask *create_redis_task(const std::string& url, int retry_max, redis_callback_t callback);
}
其中url的格式为:redis://:password@host:port/dbnum。port默认值为6379,dbnum默认值为0。
workflow的一个重要特点是由框架来管理连接,使用户接口可以极致的精简,并实现最有效的连接复用。框架根据任务的用户名密码以及dbnum,来查找一个可以复用的连接。如果找不到则发起新连接并进行用户登陆,数据库选择等操作。如果是一个新的host,还要进行DNS解析。请求出错还可能retry。这每一个步骤都是异步并且透明的,用户只需要填写自己的request,将任务启动,就可以在callback里得到请求的结果。唯一需要注意的是,每次任务的创建都需要带着password,因为可能随时有登陆的需要。
同样的方法我们可以用来创建mysql任务。但对于有事务需求的mysql,则需要通过我们的WFMySQLConnection来创建任务了,否则无法保证整个事务都在同一个连接上进行。WFMySQLConnection依然能做到连接和认证过程的异步性。
大多数情况下,用户使用框架产生的client任务都是无法指定具体连接。框架会有连接的复用策略:
虽然我们的框架无法指定任务要使用的连接,但是我们支持连接上下文的功能。这个功能对于实现有连接状态的server非常重要。相关的内容可以参考关于连接上下文相关文档。
是的,我们会认为同一域名下的所有目标IP对等,服务能力也相同。因此任何一个请求都会寻找一个从本地看起来负载最轻的目标进行通信,同时也内置了熔断与恢复策略。同一域名下的负载均衡,目标都必须服务在同一端口,而且无法配置不同权重。负载均衡的优先级高于连接复用,也就是说会先选择好通信地址再考虑复用连接问题。
可以参考upstream相关文档。upstream还可以实现很多更复杂的服务管理需求。
很多情况下我们使用HttpMessage::get_parsed_body()来获得http消息体。但从效率角度上考虑,我们并不自动为用户解码chunked编码,而是返回原始body。解码chunked编码可以用HttpChunkCursor,例如:
#include "workflow/HttpUtil.h"
void http_callback(WFHttpTask *task)
{
protocol::HttpResponse *resp = task->get_resp();
protocol::HttpChunkCursor cursor(resp);
const void *chunk;
size_t size;
while (cursor.next(&chunk, &size))
{
...
}
}
cursor.next操作每次返回一个chunk的起始位置指针和chunk大小,不进行内存拷贝。使用HttpChunkCursor之前无需判断消息是不是chunk编码,因为非chunk编码也可以认为整体就是一个chunk。
我们不推荐这个做法,因为任何任务都可以串进任务流,无需占用线程等待。如果一定要这样做,可以用我们提供的WFFuture来实现。请不要直接使用std::future,因为我们所有通信的callback和process都在一组线程里完成,使用std::future可能会导致所有线程都陷入等待,引发整体死锁。WFFuture通过动态增加线程的方式来解决这个问题。使用WFFuture还需要注意在任务的callback里把要保留的数据(一般是resp)通过std::move移动到结果里,否则callback之后数据会随着任务一起被销毁。
最常见的,同一个series里的任务共享series上下文,通过series的get_context()和set_context()的方法来读取和修改。而parallel在它的callback里,也可以通过series_at()获到它所包含的各个series(这些series的callback已经被调用,但会在parallel callback之后才被销毁),从而获取它们的上下文。由于parallel也是一种任务,所以它可以把汇总的结果通过它所在的series context继续传递。
总之,series是协程,series context就是协程的局部变量。parallel是协程的并行,可汇总所有协程的运行结果。
在我们的架构里,rpc是workflow上的应用,或者说rpc是workflow上的一组协议实现。如果你有接口描述,远程接口调用的需求,一定要试用一下srpc,这是一个把workflow的功能发挥到极致又和workflow完美融合的rpc系统,同时兼容brpc和thrift协议且更快更易用,满足你的任何rpc需求。地址:https://github.com/sogou/srpc
Server的stop()操作是优雅关闭,程序结束之前必须关闭所有server。stop()由shutdown()和wait_finish()组成,wait_finish会等待所有运行中server task所在series结束。也就是说,你可以在server task回复完成的callback里,继续向series追加任务。stop()操作会等待这些任务的结束。另外,如果你同时开多个server,最好的关闭方法是:
int main()
{
// 一个server对象不能start多次,所以多端口服务需要定义多个server对象
WFRedisServer server1(process);
WFRedisServer server2(process);
server1.start(8080);
server2.start(8888);
getchar(); // 输入回车结束
// 先全部关闭,再等待。
server1.shutdown();
server2.shutdown();
server1.wait_finish();
server2.wait_finish();
return 0;
}
因为server的结束由shutdown()和wait_finish()组成,显然就可以在process里shutdown,在main()里wait_finish,例如:
#include <string.h>
#include <atomic>
#include “workflow/WFHttpServer.h”
extern void process(WFHttpTask *task);
WFHttpServer server(process);
void process(WFHttpTask *task) {
if (strcmp(task->get_req()->get_request_uri(), “/stop”) == 0) {
static std::atomic<int> flag;
if (flag++ == 0)
server.shutdown();
task->get_resp()->append_output_body(“<html>server stop</html>”);
return;
}
/* Server’s logic */
// ....
}
int main() {
if (server.start(8888) == 0)
server.wait_finish();
return 0;
}
以上代码实现一个http server,在收到/stop的请求时结束程序。process中的flag是必须的,因为process并发执行,只能有一个线程来调用shutdown操作。
还是使用counter。在其它异步框架的回调里,对counter进行count操作。
void other_callback(server_task, counter, ...)
{
server_task->get_resp()->append_output_body(result);
counter->count();
}
void process(WFHttpTask *server_task)
{
WFCounterTask *counter = WFTaskFactory::create_counter_task(1, nullptr);
OtherAsyncTask *other_task = create_other_task(other_callback, server_task, counter);//非workflow框架的任务
other_task->run();
series_of(server_task)->push_back(counter);
}
注意以上代码里,counter->count()的调用可能先于counter的启动。但无论什么时序,程序都是完全正确的。
如果浏览器可以访问,但用workflow抓取失败,很大概率是因为站点使用了TLS扩展功能的SNI。可以通过全局配置打开workflow的客户端SNI功能:
struct WFGlobalSettings settings = GLOBAL_SETTINGS_DEFAULT;
settings.endpoint_params.use_tls_sni = true;
WORKFLOW_library_init(&settings);
开启这个功能是有一定代价的,所有https站点都会启动SNI,相同IP地址但不同域名的访问,将无法复用SSL连接。
因此用户也可以通过upstream功能,只打开对某个确定域名的SNI功能:
#Include "workflow/UpstreamManager.h"
int main()
{
UpstreamManager::upstream_create_weighted_random("www.sogou.com", false);
struct AddressParams params = ADDRESS_PARAMS_DEFAULT;
params.endpoint_params.use_tls_sni = true;
UpstreamManager::upstream_add_server("www.sogou.com", "www.sogou.com", ¶ms);
...
}
上面的代码把www.sogou.com设置为upstream名,并且加入一个同名的server,同时打开SNI功能。
方法一(只适用于http任务且无法重定向):
可以通过代理服务器的地址创建http任务,并重新设置request_uri和Host头。假设我们想通过代理服务器www.proxy.com:8080访问http://www.sogou.com/ ,方法如下:
task = WFTaskFactory::create_http_task("http://www.proxy.com:8080", 0, 0, callback);
task->set_request_uri("http://www.sogou.com/");
task->set_header_pair("Host", "www.sogou.com");
方法二(通用。但有些代理服务器只支持HTTPS。HTTP还是推荐用方法一):
通过带proxy_url的接口创建http任务:
class WFTaskFactory
{
public:
static WFHttpTask *create_http_task(const std::string& url,
const std::string& proxy_url,
int redirect_max, int retry_max,
http_callback_t callback);
};
其中proxy_url的格式为:http://user:[email protected]:port/
proxy只能是"http://"开头,而不能是"https://"。port默认值为80。
这个方法适用于http和https URL的代理,可以重定向,重定向时继续使用该代理服务器。
推荐使用wfrest项目,这是基于workflow的一套RESTful API开发框架,项目地址:https://github.com/wfrest/wfrest
修改了一下tutorial-06-parallel_wget.cc
的源码,同时发起1000个http请求(每个请求等待5秒后简单的返回hello world),前面的请求正常返回,后面的请求遇到了ERROR! state = 1, error = 11
的错误。
在http服务里记录了请求数量,发现实际只有200个请求,似乎在哪里有请求数量限制。
想知道这个错误的意义是什么,这个200个请求的限制是http client带来的还是并行任务处理器带来的。
希望编译后把 _include
_lib
拷走就能用
第一个问题:当CommConnEntry处于CONN_STATE_KEEPALIVE状态时,add entry alive_list;当CommConnEntry处于CONN_STATE_IDLE状态时,add entry idle_list;alive_list与idle_list有何区别?CONN_STATE_KEEPALIVE状态与CONN_STATE_IDLE状态有何区别?alive_list与idle_list释放的entry时机在什么情况下发生?
第二个小问题:ref的主要功能是什么那?CommService中ref与CommConnEntry中ref区别?(entry->ref handle前加1,handle后减1)
第三个问题:以下宏中CONN_STATE_RECEIVING的含义是?(不知为何没有CONN_STATE_SEND状态);
#define CONN_STATE_CONNECTING 0
#define CONN_STATE_CONNECTED 1
#define CONN_STATE_RECEIVING 2
#define CONN_STATE_SUCCESS 3
#define CONN_STATE_IDLE 4
#define CONN_STATE_KEEPALIVE 5
#define CONN_STATE_CLOSING 6
#define CONN_STATE_ERROR 7
I would like to point out that identifiers like “_POLLER_H_
” and “_WFKAFKACLIENT_H_
” do eventually not fit to the expected naming convention of the C++ language standard.
Would you like to adjust your selection for unique names?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.