Giter Site home page Giter Site logo

qihoo360 / qbusbridge Goto Github PK

View Code? Open in Web Editor NEW
292.0 29.0 72.0 626 KB

The Apache Kafka Client SDK

License: MIT License

Shell 4.25% CMake 5.53% C 2.45% C++ 79.14% Makefile 0.42% Go 3.12% PHP 2.13% Python 2.34% SWIG 0.62%
kafka consumer producer kafkaclient kafkasdk cplusplus php python golang kafkabridge

qbusbridge's Introduction

Introduction 中文

  • Qbusbridge is a client SDK for pub-sub messaging systems. Currently it supports:

    User could switch to any pub-sub messaging system by changing the configuration file. The default config is accessing Kafka, if you want to change it to Pulsar, change the config to:

    mq.type=pulsar
    # Other configs for pulsar...

    See config for more details.

    TODO: English config docs is missed currently.

  • Qbusbridge-Kafka is based on the librdkafka under the hook. A mass of details related to how to use has been hidden, that making QBus more simple and easy-to-use than librdkafka. For producing and consuming messages, the only thing need the users to do is to invoke a few APIs, for these they don't need to understand too much about Kafka.

  • The reliability of messages producing, that is may be the biggest concerns of the users, has been considerably improved.

Features

  • Multiple programming languages are supported, includes C++, PHP, Python, Golong, with very consistent APIs.
  • Few interfaces, so it is easy to use.
  • For advanced users, adapting librdkafka's configration by profiles is also supported.
  • In the case of writing data not by keys, the SDK will do the best to guarantee the messages being written successfully .
  • Two writing modes, synchronous and asynchronous, are supported.
  • As for messages consuming, the offset could be submited automatically, or by manual configurating.
  • For the case of using php-fpm, the connection is keeping-alived for reproduce messages uninterruptedly, saving the cost caused by recreating connections.

Compiling

Ensure your system has g++ (>= 4.8.5), boost (>= 1.41), cmake (>= 3.1) and swig (>= 3.0.12) installed.

In addition, qbus SDK is linking libstdc++ statically, so you must ensure that libstdc++.a exists. For CentOS users, run:

sudo yum install -y glibc-static libstdc++-static

git clone:

git clone --recursive https://github.com/ntt360/qbusbridge.git

SASL Support

If you need librdkafa to support kafka SASL authentication, you also need to install:

sudo yum install -y cyrus-sasl-devel

If you also use GSSAPI authentication, you need to compile the corresponding plugin:

sudo yum install -y cyrus-sasl-gssapi

1. Install submodules

Run ./build_dependencies.sh.

It will automatically download submodules and install them to cxx/thirdparts/local where CMakeLists.txt finds headers and libraries.

See ./cxx/thirdparts/local:

include/
  librdkafka/
    rdkafka.h
  log4cplus/
    logger.h
lib/
  librdkafka.a
  liblog4cplus.a

If you want to support SASL functionality, after compiling, you can go to the cxx/thirdparts/librdkafka/examples/ directory and execute the following command to test whether the SASL component has been successfully compiled:

cd cxx/thirdparts/librdkafka/examples/
./rdkafka_example -X builtin.features
# builtin.features = gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,sasl_oauthbearer,http,oidc

Make sure the output of builtin.features has compiled the SASL related authentication modules!

2. Build SDK

C++

Navigate to the cxx directory and run ./build.sh, following files will be generated:

include/
  qbus_consumer.h
  qbus_producer.h
lib/
  debug/libQBus.so
  release/libQBus.so

Though building C++ SDK requires C++11 support, the SDK could be used with older g++. eg. build qbus SDK with g++ 4.8.5 and use qbus SDK with g++ 4.4.7.

Go

Navigate to the golang directory and run ./build.sh, following files will be generated:

gopath/
  src/
    qbus/
      qbus.go
      libQBus_go.so

You can enable go module for examples by running USE_GO_MOD=1 ./build.sh. Then following files will be generated:

examples/
  go.mod
  qbus/
    qbus.go
    go.mod
    libQBus_go.so

Python

Navigate to the python directory and run ./build.sh, following files will be generated:

examples/
  qbus.py
  _qbus.so

PHP

Navigate to the php directory and run build.sh, following files will be generated:

examples/
  qbus.php
  qbus.so

3. Build examples

C++

Navigate to examples subdirectory and run ./build.sh [debug|release] to generate executable files. debug is using libQBus.so in lib/debug subdirectory, release is using libQBus.so in lib/release subdirectory. Run make clean to delete them.

If you want to build your own programs, see how Makefile does.

Go

Navigate to examples subdirectory and run ./build.sh to generate executable files, run ./clean.sh to delete them.

Add path of libQBus_go.so to env LD_LIBRARY_PATH, eg.

export LD_LIBRARY_PATH=$PWD/gopath/src/qbus:$LD_LIBRARY_PATH

If you want to build your own programs, add generated gopath directory to env GOPATH, or move gopath/src/qbus directory to $GOPATH/src.

Python

Copy generated qbus.py and _qbus.so to the path of the Python scripts to run.

PHP

Edit php.ini and add extension=<module-path>, <module-path> is the path of qbus.so.

Usage

Data Producing

  • In the case of writing data not by keys, the SDK will do it's best to submit every single message. As long as there is one broker in the Kafka cluster behaving normally, it will try to resend.
  • Writing data only need to invoke the produce interface, and in the asynchronous mode, by checking the return value, you could know whether the sending queue is full or not.
  • In the synchronous writing mode, produce interface will return value directively that indicate whether the current message has been written succuessfully. But that is at the expense of some extra performance loss and CPU usage. So asynchronous mode is recommended.
  • The following is a C++ example demonstrating how to invoke the produce interface:
bool QbusProducer::init(const string& broker_list,
                        const string& log_path,
                        const string& config_path,
                        const string& topic_name);
bool QbusProducer::produce(const char* data,
                           size_t data_len,
                           const std::string& key);
void QbusProducer::uninit();
  • C++ SDK use example:
#include <string>
#include <iostream>
#include "qbus_producer.h"

int main(int argc, const char* argv[]) {
    qbus::QbusProducer qbus_producer;
    if (!qbus_producer.init("127.0.0.1:9092",
                    "./log",
                    "./config",
                    "topic_test")) {
        std::cout << "Failed to init" << std::endl;
        return 0;
    }

    std::string msg("test\n");
    if (!qbus_producer.produce(msg.c_str(), msg.length(), "key")) {
        std::cout << "Failed to produce" << std::endl;
    }

    qbus_producer.uninit();

    return 0;
}

Data Consuming

  • Consuming data only need to invoke the subscribeOne to subscribe the 'topic' (also support subscribing multiple topics). The current process is not blocked, every message will send back to the user through the callback.
  • The SDK also supports submit offset manually, users can submit the offset in the code of the message body that returned by through callbacks.
  • The following is an example of C++, that demonstrate the usage of the consuming interface:
bool QbusConsumer::init(const std::string& broker_list,
                        const std::string& log_path,
                        const std::string& config_path,
                        const QbusConsumerCallback& callback);
bool QbusConsumer::subscribeOne(const std::string& group, const std::string& topic);
bool QbusConsumer::subscribe(const std::string& group,
                             const std::vector<std::string>& topics);
bool QbusConsumer::start();
void QbusConsumer::stop();
bool QbusConsumer::pause(const std::vector<std::string>& topics);
bool QbusConsumer::resume(const std::vector<std::string>& topics);
  • C++ SDK use example:
#include <iostream>
#include "qbus_consumer.h"

qbus::QbusConsumer qbus_consumer;
class MyCallback: public qbus::QbusConsumerCallback {
    public:
        virtual void deliveryMsg(const std::string& topic,
                    const char* msg,
                    const size_t msg_len) const {
            std::cout << "topic: " << topic << " | msg: " << std::string(msg, msg_len) << std::endl;
        }

};

int main(int argc, char* argv[]) {
    MyCallback my_callback;
    if (qbus_consumer.init("127.0.0.1:9092",
                    "log",
                    "config",
                    my_callback)) {
        if (qbus_consumer.subscribeOne("groupid_test", "topic_test")) {
            if (!qbus_consumer.start()) {
                std::cout << "Failed to start" << std::endl;
                return NULL;
            }

            while (1) sleep(1);  //other operations can appear here

            qbus_consumer.stop();
        } else {
            std::cout << "Failed subscribe" << std::endl;
        }
    } else {
        std::cout << "Failed init" << std::endl;
    }
    return 0;
}

You can use pause() and resume() methods to pause or resume consuming some topics, see qbus_pause_resume_example.cc

See examples in C examplesC++ examplesGo examplesPython examplesPHP examples for more usage.

CONFIGURATION

The configuration file is in INI format:

[global]

[topic]

[sdk]

See rdkafka 1.0.x configuration for global and topic configurations, and sdk configuration for sdk configuration.

Normally kafkabridge works with an empty configuration file, but if your broker version < 0.10.0.0, you must specify api.version-related configuration parameters, see broker version compatibility.

eg. for broker 0.9.0.1, following configurations are necessary:

[global]
api.version.request=false
broker.version.fallback=0.9.0.1

The default config is now compatible with broker 0.9.0.1. Therefore, if higher version broker is used, api.version.request should be set true. Otherwise, the message protocol would be older version, e.g. no timestamp field.

Contact

QQ group: 876834263

qbusbridge's People

Contributors

alexstocks avatar banditsmile avatar bewaremypower avatar davidliuxh avatar litao3rd avatar mr-wing avatar nathan6 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

qbusbridge's Issues

./build_dependencies.sh没有成功

请教一下下面这个错误是什么原因。

-- Boost version: 1.70.0
-- Found the following Boost libraries:
-- regex
-- system
-- HAS_ZSTD: 0
-- HAS_SNAPPY: 0
-- VERSION: 2.6.1
CMake Error: The following variables are used in this project, but they are set to NOTFOUND.
Please set them or make sure they are set and tested correctly in the CMake files:
CURL_LIBRARIES
linked by target "pulsarStatic" in directory /home/qbusbridge/cxx/thirdparts/pulsar/pulsar-client-cpp/lib

-- Configuring incomplete, errors occurred!
See also "/home/qbusbridge/cxx/thirdparts/pulsar/pulsar-client-cpp/_builds/CMakeFiles/CMakeOutput.log".

ImportError

python producer.py /home/work/qbusbridge/config/kafka.ini mini_spider_struct_data groupid_test ip1:port1,ip2:port2,ip3:port3
Traceback (most recent call last):
File "producer.py", line 1, in
import qbus
File "/home/work/qbusbridge/python/examples/qbus.py", line 26, in
_qbus = swig_import_helper()
File "/home/work/qbusbridge/python/examples/qbus.py", line 22, in swig_import_helper
_mod = imp.load_module('_qbus', fp, pathname, description)
ImportError: /home/work/qbusbridge/python/examples/_qbus.so: undefined symbol: _ZTVN6snappy22UncheckedByteArraySinkE

php 无法build

../cxx/src/qbus_record_msg.h:6:30: fatal error: log4cplus/logger.h: No such file or directory
compilation terminated.
In file included from ../cxx/src/qbus_consumer.cc:3:0:
../cxx/util/logger.h:4:30: fatal error: log4cplus/logger.h: No such file or directory
compilation terminated.
../cxx/src/qbus_producer.cc:9:47: fatal error: thirdparts/librdkafka/src/rdkafka.h: No such file or directory
compilation terminated.
In file included from ../cxx/src/qbus_config.cc:1:0:
../cxx/src/qbus_config.h:6:41: fatal error: boost/property_tree/ptree.hpp: No such file or directory
compilation terminated.
In file included from ../cxx/src/qbus_helper.cc:1:0:
../cxx/src/qbus_helper.h:8:47: fatal error: thirdparts/librdkafka/src/rdkafka.h: No such file or directory
compilation terminated.
In file included from ../cxx/util/logger.cc:1:0:
../cxx/util/logger.h:4:30: fatal error: log4cplus/logger.h: No such file or directory
compilation terminated.
In file included from ../cxx/src/qbus_consumer_imp.cc:1:0:
../cxx/src/qbus_consumer_imp.h:9:47: fatal error: thirdparts/librdkafka/src/rdkafka.h: No such file or directory
compilation terminated.
qbus_wrap.cxx:747:18: fatal error: zend.h: No such file or directory

内存一直在增加

qbus::QbusProducer triggerQbusProducer;
string dispatchTopic = DISPATCH_CONFIG_NAME;
this->QbusProducerInit(triggerQbusProducer, dispatchTopic);
this->QbusProducerData(triggerQbusProducer, sendData);
this->QbusProducerDeinit(triggerQbusProducer);

用法是这样的,每个协程都会调一次

作者你好 ,windows 7 下使用gitbash 执行 ./build_librdkafka.sh 命令无法编译报错,请问是什么原因导致呢?

$ ./build_librdkafka.sh
make[1]: Entering directory 'C:/Users/win7/kafkabridge/cxx/thirdparts/librdkafka/tests'
rm -f .test 0039-event.o 0076-produce_retry.o 0028-long_topicnames.o 0031-get_offsets.o 0051-assign_adds.o 0074-producev.o 0011-produce_batch.o 0004-conf.o 0021-rkt_destroy.o 0081-admin.o 0013-null-msgs.o 0068-produce_timeout.o 0038-performance.o 0046-rkt_cache.o 0008-reqacks.o 0036-partial_fetch.o 0073-headers.o 0020-destroy_hang.o 0035-api_version.o 0006-symbols.o 0092-mixed_msgver.o 0084-destroy_flags.o 0086-purge.o 0050-subscribe_adds.o 0069-consumer_add_parts.o 0044-partition_cnt.o 0030-offset_commit.o 0029-assign_offset.o 0026-consume_pause.o 0034-offset_reset.o 0018-cgrp_term.o 0000-unittests.o 0094-idempotence_msg_timeout.o 0055-producer_latency.o 0003-msgmaxsize.o 0083-cb_event.o 0017-compression.o 0093-holb.o 0052-msg_timestamps.o 0002-unkpart.o 0040-io_event.o 0037-destroy_hang_local.o 0072-headers_ut.o 0025-timers.o 0048-partitioner.o 0089-max_poll_interval.o 0012-produce_consume.o 0047-partial_buf_tmout.o 0049-consume_conn_close.o 0007-autotopic.o 0014-reconsume-191.o 0022-consume_batch.o 0001-multiobj.o 0033-regex_subscribe.o 0077-compaction.o 0062-stats_event.o 0019-list_groups.o 0091-max_poll_interval_timeout.o 0088-produce_metadata_timeout.o 0080-admin_ut.o 0045-subscribe_update.o 0005-order.o 0041-fetch_max_bytes.o 0043-no_connection.o 0015-offset_seeks.o 0056-balanced_group_mt.o 0042-many_topics.o 0075-retry.o 0064-interceptors.o 0090-idempotence.o 0079-fork.o 0066-plugins.o 0057-invalid_topic.o 0061-consumer_lag.o 0070-null_empty.o 0053-stats_cb.o 0082-fetch_max_bytes.o 0095-all_brokers_down.o 8000-idle.o 0063-clusterid.o 0058-log.o 0078-c_from_cpp.o 0085-headers.o 0065-yield.o 0060-op_prio.o 0067-empty_topic.o 0054-offset_time.o 0059-bsearch.o test.o testcpp.o tinycthread.o tinycthread_extra.o rdlist.o sockem.o sockem_ctrl.o merged
C:/Program Files/Git/mingw64/bin/make -C interceptor_test clean
make[2]: Entering directory 'C:/Users/win7/kafkabridge/cxx/thirdparts/librdkafka/tests/interceptor_test'
rm -f interceptor_test.o interceptor_test.d
rm -f interceptor_test
.a interceptor_test.1 interceptor_test
interceptor_test.lds
make[2]: Leaving directory 'C:/Users/win7/kafkabridge/cxx/thirdparts/librdkafka/tests/interceptor_test'
make[1]: Leaving directory 'C:/Users/win7/kafkabridge/cxx/thirdparts/librdkafka/tests'
make[1]: Entering directory 'C:/Users/win7/kafkabridge/cxx/thirdparts/librdkafka/examples'
rm -f rdkafka_example rdkafka_performance rdkafka_example_cpp rdkafka_consumer_example rdkafka_consumer_example_cpp kafkatest_verifiable_client rdkafka_simple_producer rdkafka_idempotent_producer
make[1]: Leaving directory 'C:/Users/win7/kafkabridge/cxx/thirdparts/librdkafka/examples'
/usr/bin/sh: C:/Program: No such file or directory
/usr/bin/sh: C:/Program: No such file or directory
make: *** [Makefile:50: clean] Error 127
checking for OS or distribution... ok (mingw64_nt-6.1-7601)
checking for C compiler from CC env... failed
checking for gcc (by command)... failed
checking for clang (by command)... failed
checking for cc (by command)... failed (fail)
checking for C++ compiler from CXX env... failed
checking for C++ compiler (g++)... failed
checking for C++ compiler (clang++)... failed
checking for C++ compiler (c++)... failed (fail)
checking executable ld... failed (disable)
checking executable nm... failed (disable)
checking executable objdump... failed (disable)
checking executable strip... failed (disable)
checking for pkgconfig (by command)... failed
checking for install (by command)... ok
checking for PIC (by compile)... failed (disable)
checking for GNU-compatible linker options... failed
checking for OSX linker options... failed
checking for GNU linker-script ld flag... failed
checking for Solaris linker-script ld flag... failed (ignore)
checking for __atomic_32 (by compile)... failed
checking for __atomic_32_lib (by compile)... failed
checking for __sync_32 (by compile)... failed (disable)
checking for __atomic_64 (by compile)... failed
checking for __atomic_64_lib (by compile)... failed
checking for __sync_64 (by compile)... failed (disable)
checking for socket (by compile)... failed
checking for socket_nsl (by compile)... failed (fail)
parsing version '0x010001ff'... ok (1.0.1)
checking for librt (by compile)... failed
checking for libpthread (by compile)... failed (fail)
checking for c11threads (by compile)... failed (disable)
checking for libdl (by compile)... failed (disable)
checking for zlib (by compile)... failed (disable)
checking for zstd (by compile)... failed (disable)
checking for libm (by compile)... failed (disable)
checking for liblz4 (by compile)... failed (disable)
checking for rapidjson (by compile)... failed (disable)
checking for crc32chw (by compile)... failed (disable)
checking for regex (by compile)... failed (disable)
checking for strndup (by compile)... failed (disable)
checking for strerror_r (by compile)... failed (disable)
checking for pthread_setname_gnu (by compile)... failed (disable)
checking for nm (by env NM)... failed

###########################################################

Configure failed

###########################################################

Accumulated failures:

###########################################################
cc (WITH_CC)
module: cc
action: fail
reason:
command 'cc --version' failed:
mklove/modules/configure.base: line 1692: cc: command not found

cxx (WITH_CXX) C++ compiler (c++)
module: cc
action: fail
reason:
command 'c++ --version' failed:
mklove/modules/configure.base: line 1692: c++: command not found

socket_nsl ()
module: socket
action: fail
reason:
compile check failed:
CC: CC
flags:
-g -O2 -Wall -Wsign-compare -Wfloat-equal -Wpointer-arith -Wcast-align -Wall -Werror _mkltmpGztdD4.c -o _mkltmpGztdD4.c.o -lsocket -lnsl :
mklove/modules/configure.base: line 1326: -g: command not found
source:
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
void foo (void) {
int s = socket(0, 0, 0);
close(s);
}

libpthread ()
module: self
action: fail
reason:
compile check failed:
CC: CC
flags: -lpthread
-g -O2 -Wall -Wsign-compare -Wfloat-equal -Wpointer-arith -Wcast-align -Wall -Werror _mkltmpL9265y.c -o _mkltmpL9265y.c.o -lpthread:
mklove/modules/configure.base: line 1326: -g: command not found
source: #include <pthread.h>

sed: can't read /c/Users/win7/kafkabridge/cxx/thirdparts/librdkafka/config.h: No such file or directory
sed: can't read /c/Users/win7/kafkabridge/cxx/thirdparts/librdkafka/config.h: No such file or directory
sed: can't read /c/Users/win7/kafkabridge/cxx/thirdparts/librdkafka/config.h: No such file or directory
sed: can't read /c/Users/win7/kafkabridge/cxx/thirdparts/librdkafka/config.h: No such file or directory
./Makefile.config missing: please run ./configure
make: *** [mklove/Makefile.base:73: mklove-check] Error 1

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.