Overview

InLong DataProxy mainly consists of connection convergence, routing, data compression, and protocol conversion. DataProxy acts as a bridge from the InLong Agent to the message queue, When the DataProxy pulls the metadata of the data streams from the Manager module, the corresponding relationship between the data streams and the topic name of the message queue is determined. When DataProxy receives a message, it will first be sent to Memory Channel for compression. And use the local Producer to send data to the back-end Cache layer (ie message queue). When the message queue fails to send abnormally, DataProxy will cache the message to the Disk Channel, the local disk. The overall architecture of InLong DataProxy is based on Apache Flume, which extends the source layer and sinks layer. It optimizes disaster recovery forwarding to improve the stability of the system.

Architecture

Overview - 图1

  • The source layer opens port monitoring, which is realized through netty server. The decoded data is sent to the channel layer
  • The channel layer has a selector, which is used to choose which type of channel to go. If the memory is eventually full, the data will be processed.
  • The data of the channel layer will be forwarded through the sink layer. The main purpose here is to convert the data to the TDMsg1 format and push it to the cache layer (tube is more commonly used here)

DataProxy Configuration

DataProxy supports configurable source-channel-sink, which is similar to flume’s configuration file structure. The configuration file name is such as dataproxy-*.conf. Currently, dataproxy-pulsar.conf and dataproxy-tube.conf are supported to distinguish different message middleware types. The specific type can be specified when startup. The default (when not specified) ) using dataproxy-pulsar.conf as configuration file.

  • Source configuration example:
  1. agent1.sources.tcp-source.channels = ch-msg1 ch-msg2 ch-msg3 ch-more1 ch-more2 ch-more3 ch-msg5 ch-msg6 ch-msg7 ch-msg8 ch-msg9 ch-msg10 ch-transfer ch -Back
  2. Define the channel used in the source. Note that if the configuration below this source uses the channel, it needs to be annotated here
  3. agent1.sources.tcp-source.type = org.apache.flume.source.SimpleTcpSource
  4. tcp resolution type definition, here provide the class name for instantiation, SimpleTcpSource is mainly to initialize the configuration and start port monitoring
  5. agent1.sources.tcp-source.msg-factory-name = org.apache.flume.source.ServerMessageFactory
  6. Handler used for message structure analysis, and set read stream handler and write stream handler
  7. agent1.sources.tcp-source.host = 0.0.0.0
  8. tcp ip binding monitoring, binding all network cards by default
  9. agent1.sources.tcp-source.port = 46801
  10. tcp port binding, port 46801 is bound by default
  11. agent1.sources.tcp-source.highWaterMark=2621440
  12. The concept of netty, set the netty high water level value
  13. agent1.sources.tcp-source.enableExceptionReturn=true
  14. The new function of v1.7 version, optional, the default is false, used to open the exception channel, when an exception occurs, the data is written to the exception channel to prevent other normal data transmission (the open source version does not add this function), Details | Increase the local disk of abnormal data landing
  15. agent1.sources.tcp-source.max-msg-length = 524288
  16. Limit the size of a single package, here if the compressed package is transmitted, it is the compressed package size, the limit is 512KB
  17. agent1.sources.tcp-source.topic = test_token
  18. The default topic value, if the mapping relationship between groupId and topic cannot be found, it will be sent to this topic
  19. agent1.sources.tcp-source.attr = m=9
  20. The default value of m is set, where the value of m is the version of inlong's internal TdMsg protocol
  21. agent1.sources.tcp-source.connections = 5000
  22. Concurrent connections go online, new connections will be broken when the upper limit is exceeded
  23. agent1.sources.tcp-source.max-threads = 64
  24. Netty thread pool work thread upper limit, generally recommended to choose twice the cpu
  25. agent1.sources.tcp-source.receiveBufferSize = 524288
  26. Netty server tcp tuning parameters
  27. agent1.sources.tcp-source.sendBufferSize = 524288
  28. Netty server tcp tuning parameters
  29. agent1.sources.tcp-source.custom-cp = true
  30. Whether to use the self-developed channel process, the self-developed channel process can select the alternate channel to send when the main channel is blocked
  31. agent1.sources.tcp-source.selector.type = org.apache.flume.channel.FailoverChannelSelector
  32. This channel selector is a self-developed channel selector, which is not much different from the official website, mainly because of the channel master-slave selection logic
  33. agent1.sources.tcp-source.selector.master = ch-msg5 ch-msg6 ch-msg7 ch-msg8 ch-msg9
  34. Specify the master channel, these channels will be preferentially selected for data push. Those channels that are not in the master, transfer, fileMetric, and slaMetric configuration items, but are in
  35. There are defined channels in channels, which are all classified as slave channels. When the master channel is full, the slave channel will be selected. Generally, the file channel type is recommended for the slave channel.
  36. agent1.sources.tcp-source.selector.transfer = ch-msg5 ch-msg6 ch-msg7 ch-msg8 ch-msg9
  37. Specify the transfer channel to accept the transfer type data. The transfer here generally refers to the data pushed to the non-tube cluster, which is only for forwarding, and it is reserved for subsequent functions.
  38. agent1.sources.tcp-source.selector.fileMetric = ch-back
  39. Specify the fileMetric channel to receive the metric data reported by the agent
  • Channel configuration examples, memory channel:
  1. agent1.channels.ch-more1.type = memory
  2. memory channel type
  3. agent1.channels.ch-more1.capacity = 10000000
  4. Memory channel queue size, the maximum number of messages that can be cached
  5. agent1.channels.ch-more1.keep-alive = 0
  6. agent1.channels.ch-more1.transactionCapacity = 20
  7. The maximum number of batches are processed in atomic operations, and the memory channel needs to be locked when used, so there will be a batch process to increase efficiency
  • Channel configuration examples, file channel:
  1. agent1.channels.ch-msg5.type = file
  2. file channel type
  3. agent1.channels.ch-msg5.capacity = 100000000
  4. The maximum number of messages that can be cached in a file channel
  5. agent1.channels.ch-msg5.maxFileSize = 1073741824
  6. file channel file maximum limit, the number of bytes
  7. agent1.channels.ch-msg5.minimumRequiredSpace = 1073741824
  8. The minimum free space of the disk where the file channel is located. Setting this value can prevent the disk from being full
  9. agent1.channels.ch-msg5.checkpointDir = /data/work/file/ch-msg5/check
  10. file channel checkpoint path
  11. agent1.channels.ch-msg5.dataDirs = /data/work/file/ch-msg5/data
  12. file channel data path
  13. agent1.channels.ch-msg5.fsyncPerTransaction = false
  14. Whether to synchronize the disk for each atomic operation, it is recommended to change it to false, otherwise it will affect the performance
  15. agent1.channels.ch-msg5.fsyncInterval = 5
  16. The time interval between data flush from memory to disk, in seconds
  • Sink configuration example:
  1. agent1.sinks.meta-sink-more1.channel = ch-msg1
  2. The upstream channel name of the sink
  3. agent1.sinks.meta-sink-more1.type = org.apache.flume.sink.MetaSink
  4. The sink class is implemented, where the message is implemented to push data to the tube cluster
  5. agent1.sinks.meta-sink-more1.master-host-port-list =
  6. Tube cluster master node list
  7. agent1.sinks.meta-sink-more1.send_timeout = 30000
  8. Timeout limit when sending to tube
  9. agent1.sinks.meta-sink-more1.stat-interval-sec = 60
  10. Sink indicator statistics interval time, in seconds
  11. agent1.sinks.meta-sink-more1.thread-num = 8
  12. Sink class sends messages to the worker thread, 8 means to start 8 concurrent threads
  13. agent1.sinks.meta-sink-more1.client-id-cache = true
  14. agent id cache, used to check the data reported by the agent to remove duplicates
  15. agent1.sinks.meta-sink-more1.max-survived-time = 300000
  16. Maximum cache time
  17. agent1.sinks.meta-sink-more1.max-survived-size = 3000000
  18. Maximum number of caches

Monitor Metrics configuration

DataProxy provide monitor indicator based on JMX, user can implement the code that read the metrics and report to user-defined monitor system. Source-module and Sink-module can add monitor metric class that is the subclass of org.apache.inlong.commons.config.metrics.MetricItemSet, and register it to MBeanServer. User-defined plugin can get module metric with JMX, and report metric data to different monitor system.

User can describe the configuration in the file “common.properties “. For example:

  1. metricDomains=DataProxy
  2. metricDomains.DataProxy.domainListeners=org.apache.inlong.dataproxy.metrics.prometheus.PrometheusMetricListener
  3. metricDomains.DataProxy.snapshotInterval=60000
  • The JMX domain name of DataProxy is “DataProxy”.
  • It is defined by the parameter “metricDomains”.
  • The listeners of JMX domain is defined by the parameter “metricDomains.$domainName.domainListeners”.
  • The class names of the listeners is separated by the space char.
  • The listener class need to implement the interface “org.apache.inlong.dataproxy.metrics.MetricListener”.
  • The snapshot interval of the listeners is defined by the parameter “metricDomains.$domainName.snapshotInterval”, the parameter unit is “millisecond”.

The method proto of org.apache.inlong.dataproxy.metrics.MetricListener is:

  1. public void snapshot(String domain, List itemValues);

The field of MetricItemValue.dimensions has these dimensions(The fields of DataProxyMetricItem defined by the Annotation “@Dimension”):

propertydescription
clusterIdDataProxy cluster ID.
sourceIdDataProxy source component name.
sourceDataIdDataProxy source component data id, when source is a TCP source, it will be port number.
inlongGroupIdInlong data group ID.
inlongStreamIdInlong data stream ID.
sinkIdDataProxy sink component name.
sinkDataIdDataProxy sink component data id, when sink is a pulsar sink, it will be topic name.

The field of MetricItemValue.metrics has these metrics(The fields of DataProxyMetricItem defined by the Annotation “@CountMetric”):

propertydescription
readSuccessCountSuccessful event count reading from source component.
readSuccessSizeSuccessful event body size reading from source component.
readFailCountFailure event count reading from source component.
readFailSizeFailure event body size reading from source component.
sendCountEvent count sending to sink destination.
sendSizeEvent body size sending to sink destination.
sendSuccessCountSuccessful event count sending to sink destination.
sendSuccessSizeSuccessful event body size sending to sink destination.
sendFailCountFailure event count sending to sink destination.
sendFailSizeFailure event body size sending to sink destination.
sinkDurationThe unit is millisecond, the duration is between current timepoint and the timepoint in sending to sink destination.
nodeDurationThe unit is millisecond, the duration is between current timepoint and the timepoint in getting event from source.
wholeDurationThe unit is millisecond, the duration is between current timepoint and the timepoint in generating event.

Monitor indicators have registered to MBeanServer, user can append JMX parameters when running DataProxy, remote server can get monitor metrics with RMI.

  1. -Dcom.sun.management.jmxremote
  2. -Djava.rmi.server.hostname=127.0.0.1
  3. -Dcom.sun.management.jmxremote.port=9999
  4. -Dcom.sun.management.jmxremote.authenticate=false
  5. -Dcom.sun.management.jmxremote.ssl=false