Offline Sync Connector Extension

Overview

Apache InLong is a powerful data synchronization tool that supports both real-time and offline synchronization, relying on Flink as its underlying engine. Through a unified Flink SQL API, users can handle both types of data synchronization tasks using the same code.

The difference between the two is that real-time synchronization uses Flink Streaming to implement data synchronization, while offline synchronization uses Flink Batch for data synchronization. In practical use, users can choose the appropriate synchronization method based on their needs..

This article describes how to extend offline synchronization connector plugins and how to extend third-party scheduling services.

Offline connector extension

Offline synchronization, like real-time synchronization, mainly consists of two parts: Source and Sink. The biggest difference lies in whether the Source is bounded:

  • The Source for real-time synchronization is unbounded.
  • The Source for offline synchronization is bounded.

Bounded means that the offline data source has a clear start and end, typically using batch processing for offline data synchronization. The offline data source reuses the Flink Connector from real-time synchronization and adds the property of whether the Source is bounded, while the implementation of the Sink is consistent with that of real-time synchronization.

Flink’s Source provides interfaces to set data boundaries:

  1. /**
  2. * Get the boundedness of this source.
  3. *
  4. * @return the boundedness of this source.
  5. */
  6. Boundedness getBoundedness();

Boundedness is an enumeration type with two values: BOUNDED and CONTINUOUS_UNBOUNDED.

  1. @Public
  2. public enum Boundedness {
  3. /**
  4. * A BOUNDED stream is a stream with finite records.
  5. */
  6. BOUNDED,
  7. /**
  8. * A CONTINUOUS_UNBOUNDED stream is a stream with infinite records.
  9. */
  10. CONTINUOUS_UNBOUNDED
  11. }

Using Pulsar Source as an example, describe how to set the boundedness property for the Pulsar Source.

Data source boundaries

lowerBound: Represents the starting position of the boundary. upperBound: Represents the ending position of the boundary. boundaryType: Indicates the type of boundary, currently supporting two types: TIME and OFFSET.

  1. public class Boundaries {
  2. public String lowerBound;
  3. public String upperBound;
  4. public BoundaryType boundaryType;
  5. }

The boundary information is carried by the ExtractNode, which corresponds to the Source in Flink.

  1. public abstract class ExtractNode implements Node {
  2. public void fillInBoundaries(Boundaries boundaries) {
  3. Preconditions.checkNotNull(boundaries, "boundaries is null");
  4. // every single kind of extract node should provide the way to fill in boundaries individually
  5. }
  6. }

Setting Boundaries for the Source

In PulsarExtractNode, the Boundaries information will be configured into the relevant parameters of the Pulsar Connector.:

  1. @Override
  2. public void fillInBoundaries(Boundaries boundaries) {
  3. super.fillInBoundaries(boundaries);
  4. BoundaryType boundaryType = boundaries.getBoundaryType();
  5. String lowerBoundary = boundaries.getLowerBound();
  6. String upperBoundary = boundaries.getUpperBound();
  7. if (Objects.requireNonNull(boundaryType) == BoundaryType.TIME) {
  8. // set time boundaries
  9. sourceBoundaryOptions.put("source.start.publish-time", lowerBoundary);
  10. sourceBoundaryOptions.put("source.stop.at-publish-time", upperBoundary);
  11. log.info("Filled in source boundaries options");
  12. } else {
  13. log.warn("Not supported boundary type: {}", boundaryType);
  14. }
  15. }

These parameters will be recognized by the PulsarSource, and during the initialization of the PulsarSource, a BoundedStopCursor will be set for the Source.

  1. @Override
  2. public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
  3. PulsarDeserializationSchema<RowData> deserializationSchema =
  4. deserializationSchemaFactory.createPulsarDeserialization(context);
  5. PulsarSourceBuilder<RowData> sourceBuilder = PulsarSource.builder();
  6. sourceBuilder
  7. .setTopics(topics)
  8. .setStartCursor(startCursor)
  9. .setDeserializationSchema(deserializationSchema)
  10. .setProperties(properties);
  11. if (!(stopCursor instanceof NeverStopCursor)) {
  12. // 设置 stop cursor
  13. sourceBuilder.setBoundedStopCursor(stopCursor);
  14. } else {
  15. sourceBuilder.setUnboundedStopCursor(stopCursor);
  16. }
  17. return SourceProvider.of(sourceBuilder.build());
  18. }

If a BoundedStopCursor is configured, the Source’s boundedness property will be set to Boundedness.BOUNDED.

  1. public PulsarSourceBuilder<OUT> setBoundedStopCursor(StopCursor stopCursor) {
  2. this.boundedness = Boundedness.BOUNDED;
  3. this.stopCursor = checkNotNull(stopCursor);
  4. return this;
  5. }

This way, the Flink engine can recognize that this is a bounded Source, allowing it to process data using a batch approach.

Offline Sync Scheduling

Offline synchronization is based on Flink batch jobs and can be scheduled at regular intervals. Each Flink batch job is triggered by the scheduling system. InLong has a built-in scheduling system based on Quartz, which supports the scheduling of offline tasks.

The overall process of offline synchronization task scheduling is illustrated in the following diagram: Offline Sync Schedule

  • The user creates an offline synchronization task
  • After task approval, the task will be registered with the scheduling system via ScheduleClient.
  • The scheduling service will periodically generate scheduling instances based on the configuration information.
  • The scheduling instance will callback to InLong’s Schedule Operator, initiating a task execution. The callback will carry detailed task information, including GroupId, StreamId, task start and end boundaries, and other parameters.
  • The Schedule Operator will create a Flink Job based on the task’s detailed information and submit it to the Flink cluster for execution.

Scheduling Engine Expansion

InLong’s offline scheduling capability supports third-party scheduling systems. Next, we will introduce how to expand scheduling capabilities.

Scheduling Task Registration

ScheduleClient is the client for scheduling task registration, allowing users to register tasks with the scheduling system. The ScheduleClient selects the scheduling engine based on the engineType in ScheduleInfo, and users can extend scheduling capabilities by implementing the ScheduleEngineClient interface.

  1. public interface ScheduleEngineClient {
  2. /**
  3. * Check whether scheduleEngine type is matched.
  4. * */
  5. boolean accept(String engineType);
  6. /**
  7. * Register schedule to schedule engine.
  8. * @param scheduleInfo schedule info to register
  9. * */
  10. boolean register(ScheduleInfo scheduleInfo);
  11. /**
  12. * Un-register schedule from schedule engine.
  13. *
  14. * @param groupId schedule info to unregister
  15. */
  16. boolean unregister(String groupId);
  17. /**
  18. * Update schedule from schedule engine.
  19. * @param scheduleInfo schedule info to update
  20. * */
  21. boolean update(ScheduleInfo scheduleInfo);
  22. }

ScheduleEngineClient provides the capability to register, unregister, and update scheduling tasks, allowing users to implement these interfaces according to their needs.

Scheduling Task Execution

The execution of scheduling tasks relies on the scheduling service, which periodically generates scheduling instances based on the scheduling configuration. It then callbacks to InLong’s Schedule Operator to initiate a task execution. For example, using the built-in Quartz scheduling service, we can demonstrate how the scheduling system periodically triggers offline synchronization tasks.

  1. public interface ScheduleEngine {
  2. /**
  3. * Start schedule engine.
  4. * */
  5. void start();
  6. /**
  7. * Handle schedule register.
  8. * @param scheduleInfo schedule info to register
  9. * */
  10. boolean handleRegister(ScheduleInfo scheduleInfo);
  11. /**
  12. * Handle schedule unregister.
  13. * @param groupId group to un-register schedule info
  14. * */
  15. boolean handleUnregister(String groupId);
  16. /**
  17. * Handle schedule update.
  18. * @param scheduleInfo schedule info to update
  19. * */
  20. boolean handleUpdate(ScheduleInfo scheduleInfo);
  21. /**
  22. * Stop schedule engine.
  23. * */
  24. void stop();
  25. }

QuartzScheduleEngine provides a Scheduler that offers capabilities for starting, stopping, registering, unregistering, and updating scheduling tasks in response to requests from ScheduleEngineClient.

Currently, QuartzScheduleEngine supports periodic scheduling based on scheduling cycle configurations and crontab expressions. Each periodic scheduling instance includes trigger time, cycle, and other relevant information, which is used to initiate InLong data synchronization tasks.

Each scheduling instance corresponds to a QuartzOfflineSyncJob, which sends an OfflineJobRequest to the Manager.

  1. public class OfflineJobRequest {
  2. @ApiModelProperty("Inlong Group ID")
  3. @NotNull
  4. private String groupId;
  5. @ApiModelProperty("Source boundary type, TIME and OFFSET are supported")
  6. @NotNull
  7. private String boundaryType;
  8. @ApiModelProperty("The lower bound for bounded source")
  9. @NotNull
  10. private String lowerBoundary;
  11. @ApiModelProperty("The upper bound for bounded source")
  12. @NotNull
  13. private String upperBoundary;
  14. }

OfflineJobRequest includes parameters such as GroupId, StreamId, and the task’s start and end boundaries.

When extending third-party scheduling engines, users also need to construct OfflineJobRequest within the scheduling instance and send task execution requests to the Manager.

Summary

This article primarily describes methods for extending offline data synchronization capabilities, including how to enhance offline synchronization based on real-time data sources and how to support third-party scheduling engines.