Customize Flink Metrics in Sort-Connector

Overview

The InLong Sort framework allows users to define and insert custom Flink metrics within different connectors to monitor the data processing pipeline closely. These custom metrics are generally used to track key performance indicators such as serialization/deserialization success/failure counts, latency, snapshot states, transaction completion statuses, etc. These metrics are recorded and reported through SourceExactlyMetric and SinkExactlyMetric objects at the appropriate logic nodes.

To create and insert a custom Flink metric within a new connector, you typically need to follow these steps. Using the example of tracking deserialization error count (numDeserializeError) in the inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc, the following steps outline how to insert a custom metric within the InLong Sort framework.

1. Create the Metric Object

First, add a new Flink metric object in the SourceExactlyMetric or SinkExactlyMetric class. Metric objects can typically be of types like Counter, Gauge, or Meter. In this example, a Counter is created to track deserialization errors and is added as a class member:

  1. private Counter numDeserializeError;

2. Implement the registerMetricsForXXX Method

To initialize and register this metric object, write a registerMetricsForNumDeserializeError method. Within this method, the Counter object is registered with Flink’s metric system using registerCounter, allowing Flink to track the metric.

  1. public void registerMetricsForNumDeserializeError(Counter counter) {
  2. numDeserializeError = registerCounter("numDeserializeError", counter);
  3. }

In this method, the custom Counter object is linked to Flink’s metric system using registerCounter, ensuring that the metric is properly recorded during data processing.

3. Call the Registration Method in the Constructor

Within the class constructor, call the registration method with the MetricOption and MetricGroup parameters. This ensures the metric object is properly initialized and registered upon instantiation:

  1. public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) {
  2. this.metricGroup = metricGroup;
  3. this.labels = option.getLabels();
  4. registerMetricsForNumDeserializeError(new ThreadSafeCounter());
  5. }

By calling the registerMetricsForNumDeserializeError method in the constructor, the numDeserializeError counter is initialized and ready to record deserialization errors upon each instantiation.

4. Write the Metric’s Getter, Setter, and Operation Methods

To manipulate the numDeserializeError counter externally, implement the necessary getter and operation methods. In this case, an incNumDeserializeError method increments the counter whenever a deserialization error occurs:

  1. public void incNumDeserializeError() {
  2. if (numDeserializeError != null) {
  3. numDeserializeError.inc();
  4. }
  5. }

This method ensures that incNumDeserializeError is called to increment the error count whenever a deserialization error is detected.

5. Add the New Metric Output in the toString Method

To facilitate debugging, monitoring and ensure the completeness of code, include the custom metric output in the toString method:

  1. @Override
  2. public String toString() {
  3. return "SourceMetricData{"
  4. + ", numDeserializeError=" + numDeserializeError.getCount()
  5. + "}";
  6. }

6. Insert the Custom Metric in appropriate places

After registering and initializing the metric, invoke it at the appropriate logic node. In this example, call incNumDeserializeError in the deserialization method to track each deserialization error. The following code shows how to implement this:

  1. @Override
  2. public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
  3. try {
  4. // Execute deserialization logic
  5. } catch (Exception e) {
  6. // Increment error count on deserialization failure
  7. // Ensure sourceExactlyMetric is not null
  8. if(sourceExactlyMetric != null) {
  9. sourceExactlyMetric.incNumDeserializeError();
  10. }
  11. throw e;
  12. }
  13. }

This method ensures that each deserialization error triggers incNumDeserializeError, accurately reflecting error frequency.

Testing and Verification

Using sort-end-to-end-tests located in the inlong-sort/sort-end-to-end-tests/ directory:

  1. Set Metric Labels in SQL: Add inlong.metric.labels in the test SQL file to ensure Flink recognizes the metric labels. For example, in sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/postgres_test.sql:

    1. CREATE TABLE test_input1 (
    2. `id` INT primary key,
    3. name STRING,
    4. description STRING
    5. ) WITH (
    6. 'connector' = 'postgres-cdc-inlong',
    7. 'hostname' = 'postgres',
    8. 'port' = '5432',
    9. 'username' = 'flinkuser',
    10. 'password' = 'flinkpw',
    11. 'database-name' = 'test',
    12. 'table-name' = 'test_input1',
    13. 'schema-name' = 'public',
    14. 'decoding.plugin.name' = 'pgoutput',
    15. 'slot.name' = 'inlong_slot',
    16. 'debezium.slot.name' = 'inlong_slot',
    17. -- Added portion
    18. 'inlong.metric.labels' = 'groupId=pggroup&streamId=pgStream&nodeId=pgNode'
    19. );
    20. -- Keep Flink SQL for sink unchanged
  2. Configure Log Output for Metric Viewing: Enable metric log output in the test environment configuration to view results on the console:

    1. metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
    2. metrics.reporter.slf4j.interval: 5 SECONDS
  3. Run the end-to-end Test and Verify Output: Run the specific end-to-end test under path inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15 and check whether numDeserializeError is the expected value:

    1. mvn test -Dtest=Postgres2StarRocksTest

Note: You may want to insert test code or construct specific data to trigger incDeserializeError() and ensure your metrics are functioning as expected.

Notes

  • Pass MetricGroup When Creating Metrics: Ensure that when creating SourceExactlyMetric or SinkExactlyMetric, you pass a MetricGroup obtained via runtimeContext to avoid registration failures.
  • Check for Non-Null MetricOption: Validate that MetricOption is non-null before creating metric objects to avoid null pointer exceptions due to missing inlong.metric.labels.
  • Handle Null Pointers: Check for null SourceExactlyMetric or SinkExactlyMetric objects when operating on custom metrics like incNumDeserializeSuccess() to avoid null pointer exceptions if 'inlong.metric.labels' isn’t specified.
  • End-to-end Test Coverage: If a new connector metric isn’t covered by an end-to-end test, create a test to verify metric reporting functionality.

This approach allows the insertion of custom Flink metrics in the Postgres connector, verified by testing, to enhance observability.