Redis

概况

Redis Load 节点支持将数据写入 Redis 。

Data Type

详细见:Redis数据类型

Plain

c1c2c3c4c5c6c7
rowKey

Redis 字符串命令用于管理 Redis 中的字符串值。

第一个元素是 Redis 行键,必须是字符串类型,其余字段 (‘c2’ ~ ‘c6’) 将被序列化为一个值并放入 Redis 中。

Hash

Redis Hash 是一种数据类型,表示字符串字段和字符串之间的映射。其有两个成员:

  • Redis 哈希字段
  • Redis 哈希值

Set

Redis SET 是简单的字符串列表,按插入顺序排序。你可以在 Redis Set 的头部或尾部添加元素。

BitMap

BITMAP 不是一种实际的数据类型,而是在 String 对象上定义的一组面向 Bit 的类型。由于字符串是 binary safe blobs,其最大长度为512 MB, 它们适合设置多达 2^32 个不同的 Bit。

SchemaMappingMode

Dynamic

Dynamic 模式映射 java.util.map 到 RedisDataType,该模式下有两个成员:

  • Redis key
  • java.util.Map, 它将被迭代,其中键为 Redis key,值为 Redis value

Static Prefix Match

其中至少有两个字段,第一个成员是 Redis key,第二个字段中的每个字段代表一个 Redis value

  1. key, field, value1, value2, value3, [value]...

Static KV Pair

其有两个字段,第一个字段是 key ,和其他字段是键值对

  1. key, field1, value1,field2,value2,field3,value3,[field,value]...

SQL demo

Plain

Plain 只支持 Static Prefix Match 模式

  1. CREATE TABLE sink (
  2. key STRING,
  3. aaa STRING,
  4. bbb DOUBLE,
  5. ccc BIGINT,
  6. PRIMARY KEY (`key`) NOT ENFORCED
  7. ) WITH (
  8. 'connector' = 'redis-inlong',
  9. 'sink.batch-size' = '1',
  10. 'format' = 'csv',
  11. 'data-type' = 'PLAIN',
  12. 'redis-mode' = 'standalone',
  13. 'host' = 'localhost',
  14. 'port' = '56615',
  15. 'maxIdle' = '8',
  16. 'minIdle' = '1',
  17. 'maxTotal' = '2',
  18. 'timeout' = '2000'
  19. );

Hash with Prefix Match

c1c2c3c4c5c6c7
rowKeyfield: String

第一个元素是 Redis Key,必须是字符串类型
第二个元素是哈希数据类型中的 Redis 字段名
其余字段 (‘ c2 ‘ ~ ‘ c7 ‘) 将被序列化为一个值并放入 Redis 中

  1. CREATE TABLE sink (
  2. key STRING,
  3. field_name STRING,
  4. value_1 DOUBLE,
  5. value_2 BIGINT,
  6. PRIMARY KEY (`key`) NOT ENFORCED
  7. ) WITH (
  8. 'connector' = 'redis-inlong',
  9. 'sink.batch-size' = '1',
  10. 'format' = 'csv',
  11. 'data-type' = 'HASH',
  12. 'redis-mode' = 'standalone',
  13. 'host' = 'localhost',
  14. 'port' = '56869',
  15. 'maxIdle' = '8',
  16. 'minIdle' = '1',
  17. 'maxTotal' = '2',
  18. 'timeout' = '2000'
  19. );

Hash with Static KV Pair

c1c2c3c4c5c6c7
rowKeyfield1: Stringvalue 1:Stringfield 2: Stringvalue 2:Stringfield 3: Stringvalue 3:String

第一个元素是 Redis 行键,必须是字符串类型。 奇数元素 (‘ c2 ‘ / ‘ c4 ‘ / ‘ c6 ‘) 是哈希数据类型中的 Redis 字段名,必须是字符串类型。 偶数元素 (‘ c3 ‘ / ‘ c5 ‘ / ‘ c7 ‘) 是哈希数据类型中的 Redis 字段值,必须是字符串类型。

  1. CREATE TABLE sink (
  2. key STRING,
  3. field1 STRING,
  4. value1 STRING,
  5. field2 STRING,
  6. value2 STRING,
  7. PRIMARY KEY (`key`) NOT ENFORCED
  8. ) WITH (
  9. 'connector' = 'redis-inlong',
  10. 'sink.batch-size' = '1',
  11. 'format' = 'csv',
  12. 'data-type' = 'HASH',
  13. 'schema-mapping-mode' = 'STATIC_KV_PAIR',
  14. 'redis-mode' = 'standalone',
  15. 'host' = 'localhost',
  16. 'port' = '6379',
  17. 'maxIdle' = '8',
  18. 'minIdle' = '1',
  19. 'maxTotal' = '2',
  20. 'timeout' = '2000'
  21. );

Hash with Dynamic

c1c2
rowKeyfieldValueMap

第一个元素是 Redis 行键,必须是字符串类型。 第二个元素必须是 Map<String,String>, 其中键是 fieldName ,值是 fieldValue。

  1. CREATE TABLE sink (
  2. key STRING,
  3. fieldValueMap MAP<STRING,STRING>,
  4. PRIMARY KEY (`key`) NOT ENFORCED
  5. ) WITH (
  6. 'connector' = 'redis-inlong',
  7. 'sink.batch-size' = '1',
  8. 'format' = 'csv',
  9. 'data-type' = 'HASH',
  10. 'schema-mapping-mode' = 'DYNAMIC',
  11. 'redis-mode' = 'standalone',
  12. 'host' = 'localhost',
  13. 'port' = '6379',
  14. 'maxIdle' = '8',
  15. 'minIdle' = '1',
  16. 'maxTotal' = '2',
  17. 'timeout' = '2000'
  18. )"

BitMap with Static KV Pair

c1c2c3c4c5c6c7
rowKeyfield1: Longvalue 1:Booleanfield 2: Longvalue 2:Booleanfield 3: Longvalue 3:Boolean

第一个元素是 Redis 行键,必须是字符串类型。 奇数元素 (‘ c2 ‘ / ‘ c4 ‘ / ‘ c6 ‘) 是位图数据类型中的 Redis 偏移量,必须是 Long 类型。 偶数元素 (‘ c3 ‘ / ‘ c5 ‘ / ‘ c7 ‘) 是位图数据类型中的 Redis 值,必须是布尔类型。

  1. CREATE TABLE sink (
  2. key STRING,
  3. offset_1 BIGINT,
  4. value_1 BOOLEAN,
  5. offset_2 BIGINT,
  6. value_2 BOOLEAN,
  7. PRIMARY KEY (`key`) NOT ENFORCED
  8. ) WITH (
  9. 'connector' = 'redis-inlong',
  10. 'sink.batch-size' = '1',
  11. 'format' = 'csv',
  12. 'data-type' = 'BITMAP',
  13. 'schema-mapping-mode' = 'STATIC_KV_PAIR',
  14. 'redis-mode' = 'standalone',
  15. 'host' = 'localhost',
  16. 'port' = '6379',
  17. 'maxIdle' = '8',
  18. 'minIdle' = '1',
  19. 'maxTotal' = '2',
  20. 'timeout' = '2000'
  21. )