Dirty Data Archive

Overview

Dirty data refers to data that cannot be extracted, transformed, and loaded correctly due to the quality of the data itself during the process of data extraction, transform, and loading. Common dirty data may have field types, lengths, and numbers that do not match. Data serialization, deserialization exception, target library, the table does not exist, etc. Dirty data archives can dump the dirty data during InLong data flow operation to third-party storage, which is convenient for businesses to find problems. Sort currently supports dirty data archiving for S3 and Log targets, as well as dirty data archiving for common data sources such as Kafka, Doris, Iceberg, HBase, Hive, Elasticsearch, and JDBC.

Supported Nodes

TypeNameArchive Target System
Extract NodeKafkaLog, S3
Load NodeHiveLog, S3
KafkaLog, S3
HBaseLog, S3
ClickHouseLog, S3
IcebergLog, S3
ElasticsearchLog, S3
PostgreSQLLog, S3
HDFSLog, S3
TDSQL PostgresLog, S3
DorisLog, S3
SQL ServerLog, S3
GreenplumLog, S3

Dirty Data Format

During the processing of dirty data archive, we define the following system variables for formatting dirty data:

  • SYSTEM_TIME: System time, the format is ‘yyyy-MM-dd HH:mm:ss’
  • DIRTY_TYPE:Dirty type, common dirty types are SerializeError, DeserializeError, DataTypeMappingError, etc
  • DIRTY_MESSAGE: Dirty data message, that is the cause of dirty data, abnormal information, etc

The format for archiving to Log:

  • [${dirty.side-output.log-tag}] ${value}, Among them, ${value} is the merged value of ${dirty.side-output.labels} and ${dirty data}, and formatted by ‘csv’ or ‘json’

The format for archiving to S3:

  • The filename generation format of S3: ${dirty.side-output.s3.key}/${dirty.identifier}-${randome-sequence}.${suffix}
  • The format of the data in the file of S3: it merge the ${dirty.side-output.labels} and ${dirty data} and formatted by ‘csv’ or ‘json’

Notes:For ${dirty.side-output.log-tag}, ${dirty.side-output.labels}, ${dirty.identifier}, ${dirty.side-output.s3.key} see the configuration details below.

Configuration

Common Configuration

OptionRequiredDefaultTypeDescription
dirty.ignoreoptionalfalseBooleanWhether to ignore dirty data, the default is ‘false’
dirty.side-output.enableoptionalfalseBooleanWhether to support dirty data archive, the default is ‘false’
dirty.side-output.connectorrequired(none)StringThe connector of dirty data archive, it must be set when ‘dirty.side-output.enable’ is ‘true’, currently only ‘log’ and ‘s3’ are supported.
dirty.side-output.formatoptionalcsvStringThe format of dirty data archive, currently only supports ‘csv’ and ‘json’, defaults is ‘csv’.
dirty.side-output.log.enableoptionaltrueBooleanWhether to support log printing when dirty data is archived, the default is ‘true’.
dirty.side-output.ignore-errorsoptionaltrueBooleanWhether to ignore errors in dirty data archives, defaults is ‘true’.
dirty.identifierrequired(none)StringThe identifier of dirty data, it will be used for filename generation of file dirty archive, topic generation of mq dirty archive, tablename generation of database, etc, and it supports variable replace like ‘${variable}’. There are several system variables(SYSTEM_TIME,DIRTY_TYPE,DIRTY_MESSAGE) are currently supported, and the support of other variables is determined by the connector.
dirty.side-output.log-tagoptional(none)StringThe log tag of dirty data, it will be used for log printing, and it supports variable replace like ‘${variable}’. There are several system variables(SYSTEM_TIME,DIRTY_TYPE,DIRTY_MESSAGE) are currently supported, and the support of other variables is determined by the connector.
dirty.side-output.labelsoptional(none)StringThe labels of dirty data, it will be used for log printing and will be archived with dirty data, and it supports variable replace like ‘${variable}’. There are several system variables(SYSTEM_TIME,DIRTY_TYPE,DIRTY_MESSAGE) are currently supported, and the support of other variables is determined by the connector.
dirty.side-output.field-delimiteroptional,StringThe column separator of dirty data archive, it is used for ‘csv’ format, the default is ‘,’.
dirty.side-output.line-delimiteroptional\nStringThe row separator of dirty data archive, it is used for ‘csv’ and ‘json’ format, the default is ‘\n’.
dirty.side-output.batch.sizeoptional100IntegerThe cache batch size of dirty data archive, the default is ‘100’.
dirty.side-output.batch.bytesoptional10240IntegerThe cache batch byte of dirty data archive, the unit is byte and the default is ‘10240’(10KB).
dirty.side-output.retriesoptional3IntegerThe retris of dirty data archive,the default is ‘3’.
dirty.side-output.batch.intervaloptional60000IntegerThe interval time of irty data archive, the unit is millisecond, the default is ‘60000’(60s).

S3 Archive Configuration

OptionRequiredDefaultTypeDescription
dirty.side-output.s3.endpointrequired(none)StringThe endpoint of s3 archive
dirty.side-output.s3.regionrequired(none)StringThe region of s3 archive
dirty.side-output.s3.bucketrequired(none)StringThe bucket of s3 archive
dirty.side-output.s3.keyrequired(none)StringThe key of s3 archive
dirty.side-output.s3.access-key-idoptional(none)StringThe access key id of s3 archive, it needs to be configured in the environment if this item is not configured.
dirty.side-output.s3.secret-key-idoptional(none)StringThe secret key id of s3 archive, it needs to be configured in the environment if this item is not configured.

Usage

One example about sync Kafka data to Kafka data and we will introduce usage of dirty data archive(it is similar to other nodes).

  • The useage for archive to log
  1. create table `table_user_input`(
  2. `id` INT,
  3. `name` INT,
  4. `age` STRING)
  5. WITH (
  6. 'dirty.side-output.connector' = 'log',
  7. 'dirty.ignore' = 'true',
  8. 'dirty.side-output.enable' = 'true',
  9. 'dirty.side-output.format' = 'csv',
  10. 'dirty.side-output.labels' = 'SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=user',
  11. 'inlong.metric.labels' = 'groupId=1&streamId=1&nodeId=1',
  12. 'topic' = 'user_input',
  13. 'properties.bootstrap.servers' = 'localhost:9092',
  14. 'connector' = 'kafka-inlong',
  15. 'scan.startup.mode' = 'earliest-offset',
  16. 'json.timestamp-format.standard' = 'SQL',
  17. 'json.encode.decimal-as-plain-number' = 'true',
  18. 'json.map-null-key.literal' = 'null',
  19. 'json.ignore-parse-errors' = 'false',
  20. 'json.map-null-key.mode' = 'DROP',
  21. 'format' = 'json',
  22. 'json.fail-on-missing-field' = 'false',
  23. 'properties.group.id' = 'test_group');
  24. CREATE TABLE `table_user_output`(
  25. `id` INT,
  26. `name` STRING,
  27. `age` INT)
  28. WITH (
  29. 'topic' = 'user_output',
  30. 'properties.bootstrap.servers' = 'localhost:9092',
  31. 'connector' = 'kafka-inlong',
  32. 'sink.ignore.changelog' = 'true',
  33. 'json.timestamp-format.standard' = 'SQL',
  34. 'json.encode.decimal-as-plain-number' = 'true',
  35. 'json.map-null-key.literal' = 'null',
  36. 'json.ignore-parse-errors' = 'true',
  37. 'json.map-null-key.mode' = 'DROP',
  38. 'format' = 'json',
  39. 'json.fail-on-missing-field' = 'true',
  40. 'dirty.ignore' = 'true',
  41. 'dirty.side-output.connector' = 'log',
  42. 'dirty.side-output.enable' = 'true',
  43. 'dirty.side-output.format' = 'csv',
  44. 'dirty.side-output.log.enable' = 'true',
  45. 'dirty.side-output.log-tag' = 'DirtyData',
  46. 'dirty.side-output.labels' = 'SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=user');
  47. INSERT INTO `table_user_output`
  48. SELECT
  49. `id`,
  50. `name`,
  51. `age`
  52. FROM `table_user_input`;
  53. -- In this example, we deliberately input a piece of data in non-json format, such as: 1,zhangsan,18, then the following dirty data will be printed in the log according to the configuration
  54. [DirtyData] 2023-01-30 13:01:01 ValueDeserializeError,inlong,user,1,zhangsan,18
  • The useage for archive to s3
  1. create table `table_user_input`(
  2. `id` INT,
  3. `name` INT,
  4. `age` STRING)
  5. WITH (
  6. 'dirty.side-output.connector' = 's3',
  7. 'dirty.ignore' = 'true',
  8. 'dirty.side-output.enable' = 'true',
  9. 'dirty.side-output.format' = 'csv',
  10. 'dirty.side-output.labels' = 'SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=user',
  11. 'dirty.side-output.s3.bucket' = 's3-test-bucket',
  12. 'dirty.side-output.s3.endpoint' = 's3.test.endpoint',
  13. 'dirty.side-output.s3.key' = 'dirty/test',
  14. 'dirty.side-output.s3.region' = 'region',
  15. 'dirty.side-output.s3.access-key-id' = 'access_key_id',
  16. 'dirty.side-output.s3.secret-key-id' = 'secret_key_id',
  17. 'dirty.identifier' = 'inlong-user-${SYSTEM_TIME}',
  18. 'inlong.metric.labels' = 'groupId=1&streamId=1&nodeId=1',
  19. 'topic' = 'user_input',
  20. 'properties.bootstrap.servers' = 'localhost:9092',
  21. 'connector' = 'kafka-inlong',
  22. 'scan.startup.mode' = 'earliest-offset',
  23. 'json.timestamp-format.standard' = 'SQL',
  24. 'json.encode.decimal-as-plain-number' = 'true',
  25. 'json.map-null-key.literal' = 'null',
  26. 'json.ignore-parse-errors' = 'false',
  27. 'json.map-null-key.mode' = 'DROP',
  28. 'format' = 'json',
  29. 'json.fail-on-missing-field' = 'false',
  30. 'properties.group.id' = 'test_group');
  31. CREATE TABLE `table_user_output`(
  32. `id` INT,
  33. `name` STRING,
  34. `age` INT)
  35. WITH (
  36. 'topic' = 'user_output',
  37. 'properties.bootstrap.servers' = 'localhost:9092',
  38. 'connector' = 'kafka-inlong',
  39. 'sink.ignore.changelog' = 'true',
  40. 'json.timestamp-format.standard' = 'SQL',
  41. 'json.encode.decimal-as-plain-number' = 'true',
  42. 'json.map-null-key.literal' = 'null',
  43. 'json.ignore-parse-errors' = 'true',
  44. 'json.map-null-key.mode' = 'DROP',
  45. 'format' = 'json',
  46. 'json.fail-on-missing-field' = 'true',
  47. 'dirty.side-output.connector' = 's3',
  48. 'dirty.ignore' = 'true',
  49. 'dirty.side-output.enable' = 'true',
  50. 'dirty.side-output.format' = 'csv',
  51. 'dirty.side-output.labels' = 'SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=user',
  52. 'dirty.side-output.s3.bucket' = 's3-test-bucket',
  53. 'dirty.side-output.s3.endpoint' = 's3.test.endpoint',
  54. 'dirty.side-output.s3.key' = 'dirty/test',
  55. 'dirty.side-output.s3.region' = 'region',
  56. 'dirty.side-output.s3.access-key-id' = 'access_key_id',
  57. 'dirty.side-output.s3.secret-key-id' = 'secret_key_id',
  58. 'dirty.identifier' = 'inlong-user-${SYSTEM_TIME}');
  59. INSERT INTO `table_user_output`
  60. SELECT
  61. `id`,
  62. `name`,
  63. `age`
  64. FROM `table_user_input`;
  65. -- In this example, we deliberately input a piece of data in non-json format, such as: 1,zhangsan,18, then the following dirty data will be written to s3 according to the configuration(the file path is: dirty/test/inlong-user-2023-01-01130101xxxx.txt, where xxxx is a 4-digit random sequence):
  66. [DirtyData] 2023-01-30 13:01:01 ValueDeserializeError,inlong,user,1,zhangsan,18