Giter Site home page Giter Site logo

flink-connector-redis's Introduction

Flink Connector Redis

Quality Gate Status

The Redis connector allows for reading data from and writing data into Redis.

How to create a Redis Table

string

CREATE TABLE RedisTable (
  `key`   STRING,
  `value` STRING
) WITH (
  'connector' = 'redis',
  'redis.host' = 'localhost',
  'redis.port' = '6379',
  'redis.data-type' = 'string',
  'redis.key-pattern' = '*',
)

list/set

CREATE TABLE RedisTable (
  `key`   STRING,
  `value` ARRAY<STRING>
) WITH (
  'connector' = 'redis',
  'redis.host' = 'localhost',
  'redis.port' = '6379',
  'redis.data-type' = 'list', -- or 'set'
  'redis.key-pattern' = '*',
)

sorted set

CREATE TABLE RedisTable (
  `key`   STRING,
  `value` ARRAY<ROW<score DOUBLE, elem STRING>>
) WITH (
  'connector' = 'redis',
  'redis.host' = 'localhost',
  'redis.port' = '6379',
  'redis.data-type' = 'sorted_set',
  'redis.key-pattern' = '*',
)

or

CREATE TABLE RedisTable (
  `key`   STRING,
  `score` DOUBLE,
  `elem`  STRING
) WITH (
  'connector' = 'redis',
  'redis.host' = 'localhost',
  'redis.port' = '6379',
  'redis.data-type' = 'sorted_set',
  'redis.key-pattern' = '*',
)

hash

CREATE TABLE RedisTable (
  `key`   STRING,
  `value` MAP<STRING, STRING>
) WITH (
  'connector' = 'redis',
  'redis.host' = 'localhost',
  'redis.port' = '6379',
  'redis.data-type' = 'hash',
  'redis.key-pattern' = '*',
)

or

CREATE TABLE RedisTable (
  `key`   STRING,
  `field` STRING,
  `value` STRING
) WITH (
  'connector' = 'redis',
  'redis.host' = 'localhost',
  'redis.port' = '6379',
  'redis.data-type' = 'hash',
  'redis.key-pattern' = '*',
)

pubsub

publish:

CREATE TABLE RedisTable_publish (
  `id`   INT,
  `name` STRING,
  `age`  INT
) WITH (
  'connector' = 'redis',
  'redis.host' = 'localhost',
  'redis.port' = '6379',
  'redis.data-type' = 'pubsub',
  'redis.pubsub.publish-channel' = 'test',
  'format' = 'csv'
)

subscribe:

CREATE TABLE RedisTable_publish (
  `id`   INT,
  `name` STRING,
  `age`  INT
) WITH (
  'connector' = 'redis',
  'redis.host' = 'localhost',
  'redis.port' = '6379',
  'redis.data-type' = 'pubsub',
  'redis.pubsub.subscribe-patterns' = 'test',
  'format' = 'csv'
)

Connector Options

Basic options to using flink-connector-redis:

Option Required Default Type Description
connector required (none) String Specify what connector to use, for Redis use: 'redis'.
redis.deploy-mode required single enum Deploy mode for Redis, other options: 'sentinel', 'cluster', 'sharded'.
redis.host optional (none) String Required if redis in single mode.
redis.port optional (none) String Required if redis in single mode.
redis.master optional (none) String Required if redis in sentinel mode.
redis.masters optional (none) String Required if redis in sharded mode.
redis.sentinels optional (none) String Required if redis in sentinel/sharded mode. Comma separated list for redis sentinel.
redis.host-and-ports optional (none) String Required if redis in cluster mode. Comma separated list for redis cluster HostAndPort.
redis.user optional (none) String Required if Redis set user string.
redis.password optional (none) String Required if Redis set password string.
redis.data-type required (none) enum Data type for Redis. Possible options include: string, list, set, sorted_set, hash, pubsub.
redis.key-pattern optional (none) String Required if scan Redis.
redis.pubsub.subscribe-patterns optional (none) String Subscribe patterns for Redis PUBSUB.
redis.pubsub.publish-channel optional (none) String Publish channel for Redis PUBSUB.

Lookup options for flink-connector-redis:

Option Required Default Type Description
lookup.cache.max-rows required -1 Long the max number of rows of lookup cache, over this value, the oldest rows will be eliminated. 'cache.max-rows' and 'cache.ttl' options must all be specified if any of them is specified. Cache is not enabled as default.
lookup.cache.ttl required 10s Duration the cache time to live.
lookup.async required false Boolean whether to use asynchronous.

Advanced options for flink-connector-redis:

Option Required Default Type Description
redis.ignore-error required false Boolean Ignore error when redis query/insert.
redis.batch-size optional 1 Integer Batch of Redis sink.
redis.linger-ms optional 200 Long Max delay to send a Batch of Redis sink.
redis.client.timeout required 30s Duration Required Redis client timeout duration.
redis.pool.maxWaitMillis required 10000 Long Required Redis sentinel pool max wait millis.
redis.pool.testWhileIdle required false Boolean Required Redis connection pool test while idle.
redis.pool.timeBetweenEvictionRunsMillis required 30000 Long Required Redis connection pool time between eviction runs millis.
redis.pool.numTestsPerEvictionRun required -1 Integer Required Redis connection pool num tests per eviction run.
redis.pool.minEvictableIdleTimeMillis required 60000 Long Required Redis connection pool min evictable idle time millis.
redis.pool.maxTotal required 2 Integer Required Redis connection pool max total.
redis.pool.maxIdle required 1 Integer Required Redis connection pool max idle.
redis.pool.minIdle required 1 Integer Required Redis connection pool min idle.
redis.pool.testOnBorrow required true Boolean Required Redis connection pool test on borrow.
redis.pool.testOnReturn required true Boolean Required Redis connection pool test on return.

UDF

We also provide some udf, this is the example:

public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
  StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

  String host = "localhost";
  String port = "6379";
  
  final Properties properties = new Properties();
  properties.put(REDIS_HOST.key(), host);
  properties.put(REDIS_PORT.key(), port);
  final RedisOptions options = new RedisOptions(properties);

  FlinkRedisUdf.registerTo(tableEnv, options);

  tableEnv.executeSql("select redis_get('foo')").print();
  /* 
     +----+--------+
     | op | EXPR$0 |
     +----+--------+
     | +I |    bar |
     +----+--------+
     1 row in set
  */
}

Data Type Mapping

Redis stores message keys and values as bytes/string. Generally, the data type of Redis is bytes/string. The Redis messages also can be deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to Formats pages for more details.

flink-connector-redis's People

Contributors

yagagagaga avatar

Stargazers

 avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.