Monitor Metrics

Overview

We add metric computing for node. Sort will compute metric when user just need add with option inlong.metric.labels that includes groupId={groupId}&streamId={streamId}&nodeId={nodeId}. Sort will export metric by flink metric group, So user can use metric reporter to get metric data.

Metric

Supporting extract node

Node level metric

metric nameextract nodedescription
groupId_streamId_nodeId_numRecordsInkafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdcinput records number
groupId_streamId_nodeId_numBytesInkafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdcinput bytes number
groupId_streamId_nodeId_numRecordsInPerSecondkafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdcinput records per second
groupId_streamId_nodeId_numBytesInPerSecondkafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdcinput bytes number per second

Table level metric

It is used for all database sync.

Metric nameExtract nodeDescription
groupId_streamId_nodeId_database_table_numRecordsInmysql-cdcinput records number
groupId_streamId_nodeId_database_schema_table_numRecordsInoracle-cdc,postgresql-cdcinput records number
groupId_streamId_nodeId_database_collection_numRecordsInmongodb-cdcinput records number
groupId_streamId_nodeId_database_table_numBytesInmysql-cdcinput records number
groupId_streamId_nodeId_database_schema_table_numBytesInoracle-cdc,postgresql-cdcinput records number
groupId_streamId_nodeId_database_collection_numBytesInmongodb-cdcinput bytes number
groupId_streamId_nodeId_database_table_numRecordsInPerSecondmysql-cdcinput records number per second
groupId_streamId_nodeId_database_schema_table_numRecordsInPerSecondoracle-cdc,postgresql-cdcinput records number per second
groupId_streamId_nodeId_database_collection_numRecordsInPerSecondmongodb-cdcinput records number per second
groupId_streamId_nodeId_database_table_numBytesInPerSecondmysql-cdcinput bytes number per second
groupId_streamId_nodeId_database_schema_table_numBytesInPerSecondoracle-cdc,postgresql-cdcinput bytes number per second
groupId_streamId_nodeId_database_collection_numBytesInPerSecondmongodb-cdcinput bytes number per second

supporting load node

Node level metric

Metric nameLoad nodeDescription
groupId_streamId_nodeId_numRecordsOutclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,mysql,
oracle,postgresql,sqlserver,tdsql-postgresql
out records number
groupId_streamId_nodeId_numBytesOutclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,mysql,
oracle,postgresql,sqlserver,tdsql-postgresql
output byte number
groupId_streamId_nodeId_numRecordsOutPerSecondclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,mysql,
oracle,postgresql,sqlserver,tdsql-postgresql
output records per second
groupId_streamId_nodeId_numBytesOutPerSecondclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,mysql,
oracle,postgresql,sqlserver,tdsql-postgresql
output bytes per second
groupId_streamId_nodeId_dirtyRecordsOutclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,mysql,
oracle,postgresql,sqlserver,tdsql-postgresql
output records
groupId_streamId_nodeId_dirtyBytesOutclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,mysql,
oracle,postgresql,sqlserver,tdsql-postgresql
output bytes

Table level metric

Metric nameLoad nodeDescription
groupId_streamId_nodeId_database_table_numRecordsOutdoris,iceberg,starRocksout records number
groupId_streamId_nodeId_database_schema_table_numRecordsOutpostgresqlout records number
groupId_streamId_nodeId_topic_numRecordsOutkafkaout records number
groupId_streamId_nodeId_database_table_numBytesOutdoris,iceberg,starRocksout byte number
groupId_streamId_nodeId_database_schema_table_numBytesOutpostgresqlout byte number
groupId_streamId_nodeId_topic_numBytesOutkafkaout byte number
groupId_streamId_nodeId_database_table_numRecordsOutPerSeconddoris,iceberg,starRocksout records number per second
groupId_streamId_nodeId_database_schema_table_numRecordsOutPerSecondpostgresqlout records number per second
groupId_streamId_nodeId_topic_numRecordsOutPerSecondkafkaout records number per second
groupId_streamId_nodeId_database_table_numBytesOutPerSeconddoris,iceberg,starRocksout bytes number per second
groupId_streamId_nodeId_database_schema_table_numBytesOutPerSecondpostgresqlout bytes number per second
groupId_streamId_nodeId_topic_numBytesOutPerSecondkafkaout bytes number per second
groupId_streamId_nodeId_database_table_dirtyRecordsOutdoris,iceberg,starRocksout records number
groupId_streamId_nodeId_database_schema_table_dirtyRecordsOutpostgresqlout records number
groupId_streamId_nodeId_topic_dirtyRecordsOutkafkaout records number
groupId_streamId_nodeId_database_table_dirtyBytesOutdoris,iceberg,starRocksout byte number
groupId_streamId_nodeId_database_schema_table_dirtyBytesOutpostgresqlout byte number
groupId_streamId_nodeId_topic_dirtyBytesOutkafkaout byte number

Usage

One example about sync mysql data to postgresql data. And We will introduce usage of metric.

  • use flink sql
  1. create table `table_groupId_streamId_nodeId1`(
  2. `id` INT,
  3. `name` INT,
  4. `age` STRING,
  5. PRIMARY KEY(`id`) NOT ENFORCED)
  6. WITH (
  7. 'connector' = 'mysql-cdc-inlong',
  8. 'hostname' = 'xxxx',
  9. 'username' = 'xxx',
  10. 'password' = 'xxx',
  11. 'database-name' = 'test',
  12. 'scan.incremental.snapshot.enabled' = 'true',
  13. 'server-time-zone' = 'GMT+8',
  14. 'table-name' = 'user',
  15. 'inlong.metric.labels' = 'groupId=xxgroup&streamId=xxstream&nodeId=xxnode'
  16. );
  17. CREATE TABLE `table_groupId_streamId_nodeId2`(
  18. PRIMARY KEY (`id`) NOT ENFORCED,
  19. `id` INT,
  20. `name` STRING,
  21. `age` INT)
  22. WITH (
  23. 'connector' = 'jdbc-inlong',
  24. 'url' = 'jdbc:postgresql://ip:5432/postgres',
  25. 'username' = 'postgres',
  26. 'password' = 'inlong',
  27. 'table-name' = 'public.user',
  28. 'inlong.metric.labels' = 'groupId=pggroup&streamId=pgStream&nodeId=pgNode'
  29. );
  30. INSERT INTO `table_groupId_streamId_nodeId2`
  31. SELECT
  32. `id`,
  33. `name`,
  34. `age`
  35. FROM `table_groupId_streamId_nodeId1`;
  • We can add metric report in flink-conf.yaml
  1. metric.reporters: promgateway
  2. metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
  3. metrics.reporter.promgateway.host: ip
  4. metrics.reporter.promgateway.port: 9091
  5. metrics.reporter.promgateway.interval: 60 SECONDS

ip and port is your pushgateway setting.

  • We can visit http://ip:port of pushgateway after execute flink sql. Metric name will add prefix flink_taskmanager_job_task_operator when metric report is org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.
    We can see full metric name:
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsIn,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesIn,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsInPerSecond,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesInPerSecond,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsOut,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesOut,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsOutPerSecond,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesOutPerSecond.