Monitor Metrics

Overview

We add metric computing for node. Sort will compute metric when user just need add with option inlong.metric that includes groupId&streamId&nodeId. Sort will export metric by flink metric group, So user can use metric reporter to get metric data.

Metric

supporting extract node

metric nameextract nodedescription
groupId_streamId_nodeId_numRecordsInkafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdcinput records number
groupId_streamId_nodeId_numBytesInkafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdcinput bytes number
groupId_streamId_nodeId_numRecordsInPerSecondkafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdcinput records per second
groupId_streamId_nodeId_numBytesInPerSecondkafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdcinput bytes number per second

supporting load node

metric nameload nodedescription
groupId_streamId_nodeId_numRecordsOutclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,mysql,
oracle,postgresql,sqlserver,tdsql-postgresql
out records number
groupId_streamId_nodeId_numBytesOutclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,mysql,
oracle,postgresql,sqlserver,tdsql-postgresql
output byte number
groupId_streamId_nodeId_numRecordsOutPerSecondclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,mysql,
oracle,postgresql,sqlserver,tdsql-postgresql
output records per second
groupId_streamId_nodeId_numBytesOutPerSecondclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,mysql,
oracle,postgresql,sqlserver,tdsql-postgresql
output bytes per second

Usage

One example about sync mysql data to postgresql data. And We will introduce usage of metric.

  • use flink sql
  1. create table `table_groupId_streamId_nodeId1`(
  2. `id` INT,
  3. `name` INT,
  4. `age` STRING,
  5. PRIMARY KEY(`id`) NOT ENFORCED)
  6. WITH (
  7. 'connector' = 'mysql-cdc-inlong',
  8. 'hostname' = 'xxxx',
  9. 'username' = 'xxx',
  10. 'password' = 'xxx',
  11. 'database-name' = 'test',
  12. 'scan.incremental.snapshot.enabled' = 'true',
  13. 'server-time-zone' = 'GMT+8',
  14. 'table-name' = 'user',
  15. 'inlong.metric' = 'mysqlGroup&mysqlStream&mysqlNode1'
  16. );
  17. CREATE TABLE `table_groupId_streamId_nodeId2`(
  18. PRIMARY KEY (`id`) NOT ENFORCED,
  19. `id` INT,
  20. `name` STRING,
  21. `age` INT)
  22. WITH (
  23. 'connector' = 'jdbc-inlong',
  24. 'url' = 'jdbc:postgresql://ip:5432/postgres',
  25. 'username' = 'postgres',
  26. 'password' = 'inlong',
  27. 'table-name' = 'public.user',
  28. 'inlong.metric' = 'pggroup&pgStream&pgNode'
  29. );
  30. INSERT INTO `table_groupId_streamId_nodeId2`
  31. SELECT
  32. `id`,
  33. `name`,
  34. `age`
  35. FROM `table_groupId_streamId_nodeId1`;
  • We can add metric report in flink-conf.yaml
  1. metric.reporters: promgateway
  2. metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
  3. metrics.reporter.promgateway.host: ip
  4. metrics.reporter.promgateway.port: 9091
  5. metrics.reporter.promgateway.interval: 60 SECONDS

ip and port is your pushgateway setting.

  • We can visit http://ip:port of pushgateway after execute flink sql. Metric name will add prefix flink_taskmanager_job_task_operator when metric report is org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.
    We can see full metric name:
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsIn,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesIn,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsInPerSecond,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesInPerSecond,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsOut,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesOut,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsOutPerSecond,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesOutPerSecond.