监控指标

概览

我们为节点增加了指标计算。 用户添加 with 选项 inlong.metric.labels 后 Sort 会计算指标,inlong.metric.labels 选项的值由三部分构成:groupId={groupId}&streamId={streamId}&nodeId={nodeId}。 用户可以使用 metric reporter 去上报数据。

指标

支持的 extract 节点

指标名Extract 节点描述
groupId_streamId_nodeId_numRecordsInkafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdc输入记录数
groupId_streamId_nodeId_numBytesInkafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdc输入字节数
groupId_streamId_nodeId_numRecordsInPerSecondkafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdc每秒输入记录数
groupId_streamId_nodeId_numBytesInPerSecondkafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdc每秒输入字节数

支持表级别指标

它是用于整库同步场景

指标名Extract 节点描述
groupId_streamId_nodeId_database_table_numRecordsInmysql-cdc输入记录数
groupId_streamId_nodeId_database_schema_table_numRecordsInoracle-cdc,postgresql-cdc输入记录数
groupId_streamId_nodeId_database_collection_numRecordsInmongodb-cdc输入记录数
groupId_streamId_nodeId_database_table_numBytesInmysql-cdc输入字节数
groupId_streamId_nodeId_database_schema_table_numBytesInoracle-cdc,postgresql-cdc输入字节数
groupId_streamId_nodeId_database_collection_numBytesInmongodb-cdc输入字节数
groupId_streamId_nodeId_database_table_numRecordsInPerSecondmysql-cdc每秒输入记录数
groupId_streamId_nodeId_database_schema_table_numRecordsInPerSecondoracle-cdc,postgresql-cdc每秒输入记录数
groupId_streamId_nodeId_database_collection_numRecordsInPerSecondmongodb-cdc每秒输入记录数
groupId_streamId_nodeId_database_table_numBytesInPerSecondmysql-cdc每秒输入字节数
groupId_streamId_nodeId_database_schema_table_numBytesInPerSecondoracle-cdc,postgresql-cdc每秒输入字节数
groupId_streamId_nodeId_database_collection_numBytesInPerSecondmongodb-cdc每秒输入字节数
groupId_streamId_nodeId_database_collection_numSnapshotCreatepostgresql-cdc,pulsar尝试创建Checkpoint数
groupId_streamId_nodeId_database_collection_numSnapshotErrorpostgresql-cdc,pulsar创建Checkpoint异常数
groupId_streamId_nodeId_database_collection_numSnapshotCompletepostgresql-cdc,pulsar创建Checkpoint成功数
groupId_streamId_nodeId_database_collection_snapshotToCheckpointTimeLagpostgresql-cdc,pulsar从开始创建Checkpoint到完成创建延迟(毫秒)
groupId_streamId_nodeId_database_collection_numDeserializeSuccesspostgresql-cdc,pulsar反序列化成功数
groupId_streamId_nodeId_database_collection_numDeserializeSuccesspostgresql-cdc,pulsar反序列化异常数
groupId_streamId_nodeId_database_collection_deserializeTimeLagpostgresql-cdc,pulsar反序列化延迟(毫秒)

支持的 load 节点

指标名Load 节点描述
groupId_streamId_nodeId_numRecordsOutclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,
mysql,oracle,postgresql,sqlserver,tdsql-postgresql
输出记录数
groupId_streamId_nodeId_numBytesOutclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,
mysql,oracle,postgresql,sqlserver,tdsql-postgresql
输出字节数
groupId_streamId_nodeId_numRecordsOutPerSecondclickhouse,elasticsearch,greenplum,
hbase,hdfs,hive,iceberg,
kafka,mysql,oracle,postgresql,sqlserver,tdsql-postgresql
每秒输出记录数
groupId_streamId_nodeId_numBytesOutPerSecondclickhouse,elasticsearch,greenplum,
hbase,hdfs,hive,iceberg,kafka,
mysql,oracle,postgresql,sqlserver,tdsql-postgresql
每秒输出字节数
groupId_streamId_nodeId_dirtyRecordsOutclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,mysql,
oracle,postgresql,sqlserver,tdsql-postgresql
输出脏数据记录数
groupId_streamId_nodeId_dirtyBytesOutclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,mysql,
oracle,postgresql,sqlserver,tdsql-postgresql
输出脏数据字节数

支持表级别指标

它是用于整库同步场景

指标名Load node描述
groupId_streamId_nodeId_database_table_numRecordsOutdoris,iceberg,starRocks输出记录数据
groupId_streamId_nodeId_database_schema_table_numRecordsOutpostgresql输出记录数据
groupId_streamId_nodeId_topic_numRecordsOutkafka输出记录数据
groupId_streamId_nodeId_database_table_numBytesOutdoris,iceberg,starRocks输出字节数据
groupId_streamId_nodeId_database_schema_table_numBytesOutpostgresql输出字节数据
groupId_streamId_nodeId_topic_numBytesOutkafka输出字节数据
groupId_streamId_nodeId_database_table_numRecordsOutPerSeconddoris,iceberg,starRocks每秒记录数据
groupId_streamId_nodeId_database_schema_table_numRecordsOutPerSecondpostgresql每秒记录数据
groupId_streamId_nodeId_topic_numRecordsOutPerSecondkafka每秒记录数据
groupId_streamId_nodeId_database_table_numBytesOutPerSeconddoris,iceberg,starRocks每秒输出字节数量
groupId_streamId_nodeId_database_schema_table_numBytesOutPerSecondpostgresql每秒输出字节数量
groupId_streamId_nodeId_topic_numBytesOutPerSecondkafka每秒输出字节数量
groupId_streamId_nodeId_database_table_dirtyRecordsOutdoris,iceberg,starRocks输出脏数据记录数
groupId_streamId_nodeId_database_schema_table_dirtyRecordsOutpostgresql输出脏数据记录数
groupId_streamId_nodeId_topic_dirtyRecordsOutkafka输出脏数据记录数
groupId_streamId_nodeId_database_table_dirtyBytesOutdoris,iceberg,starRocks输出脏数据字节数据
groupId_streamId_nodeId_database_schema_table_dirtyBytesOutpostgresql输出脏数据字节数据
groupId_streamId_nodeId_topic_dirtyBytesOutkafka输出脏数据字节数据
groupId_streamId_nodeId_numSerializeSuccessstarRocks序列化成功数
groupId_streamId_nodeId_numSerializeErrorstarRocks序列化异常数
groupId_streamId_nodeId_serializeTimeLagstarRocks序列化延迟(毫秒)
groupId_streamId_nodeId_numSnapshotCreatestarRocks尝试创建Checkpoint数
groupId_streamId_nodeId_numSnapshotErrorstarRocks创建Checkpoint异常数
groupId_streamId_nodeId_numSnapshotCompletestarRocks创建Checkpoint成功数
groupId_streamId_nodeId_snapshotToCheckpointTimeLagstarRocks从开始创建Checkpoint到完成创建延迟(毫秒)

用法

这里将介绍一个同步 MYSQL 数据到 PostgreSQL 的例子,同时介绍指标的使用。

  • flink sql 的使用
  1. create table `table_groupId_streamId_nodeId1`(
  2. `id` INT,
  3. `name` INT,
  4. `age` STRING,
  5. PRIMARY KEY(`id`) NOT ENFORCED)
  6. WITH (
  7. 'connector' = 'mysql-cdc-inlong',
  8. 'hostname' = 'xxxx',
  9. 'username' = 'xxx',
  10. 'password' = 'xxx',
  11. 'database-name' = 'test',
  12. 'scan.incremental.snapshot.enabled' = 'true',
  13. 'server-time-zone' = 'GMT+8',
  14. 'table-name' = 'user',
  15. 'inlong.metric' = 'mysqlGroup&mysqlStream&mysqlNode1'
  16. );
  17. CREATE TABLE `table_groupId_streamId_nodeId2`(
  18. PRIMARY KEY (`id`) NOT ENFORCED,
  19. `id` INT,
  20. `name` STRING,
  21. `age` INT)
  22. WITH (
  23. 'connector' = 'jdbc-inlong',
  24. 'url' = 'jdbc:postgresql://ip:5432/postgres',
  25. 'username' = 'postgres',
  26. 'password' = 'inlong',
  27. 'table-name' = 'public.user',
  28. 'inlong.metric.labels' = 'groupId=xxgroup&streamId=xxstream&nodeId=xxnode'
  29. );
  30. INSERT INTO `table_groupId_streamId_nodeId2`
  31. SELECT
  32. `id`,
  33. `name`,
  34. `age`
  35. FROM `table_groupId_streamId_nodeId1`;
  • 要将指标上报到外部系统,我们可以在 flink-conf.yaml 中添加 metric report 配置(以Prometheus为例)
  1. metric.reporters: promgateway
  2. metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
  3. metrics.reporter.promgateway.host: ip
  4. metrics.reporter.promgateway.port: 9091
  5. metrics.reporter.promgateway.interval: 60 SECONDS

ipport 是你的 pushgateway 的配置。

  • 执行上面的sql后,我们可以访问 pushgateway 的 url: http://ip:port

当我们使用的 metric report 是 org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter 指标名将添加前缀 flink_taskmanager_job_task_operator
我们可以看到完整的指标名如下:
flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsIn,
flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesIn,
flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsInPerSecond,
flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesInPerSecond,
flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsOut,
flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesOut,
flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsOutPerSecond,
flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesOutPerSecond.