Audit 数据格式定义及使用

概述

在 InLong 系统架构中,采集层、汇聚层和分拣层等模块通过 Audit SDK 将审计数据上报至Audit系统。Audit Proxy 组件负责接收这些数据,并将其转换为 AuditData 格式,随后存储在 MQ 消息的消息体中。 接下来,Audit Store 从 MQ 中提取这些数据,并将其持久化至如 ClickHouse 等存储系统。本文将深入分析和讲解 Audit 系统相关的数据协议。

Audit 数据格式定义及使用 - 图1

Audit SDK 原始数据格式

Audit SDK 的原始数据采用 Protobuf 协议进行封装,其中包含请求类型、数据通用头部和数据主体等信息。以下是相关协议的详细描述:

  1. syntax = "proto3";
  2. package org.apache.inlong.audit.protocol;
  3. message BaseCommand {
  4. enum Type {
  5. PING = 0;
  6. PONG = 1;
  7. AUDIT_REQUEST = 2;
  8. AUDIT_REPLY = 3;
  9. }
  10. Type type = 1;
  11. AuditRequest audit_request = 2;
  12. AuditReply audit_reply = 3;
  13. Ping ping = 4;
  14. Pong pong = 5;
  15. }
  16. message Ping {
  17. }
  18. message Pong {
  19. }
  20. message AuditRequest {
  21. uint64 request_id = 1;
  22. AuditMessageHeader msg_header = 2;
  23. repeated AuditMessageBody msg_body = 3;
  24. }
  25. message AuditMessageHeader {
  26. string ip = 1;
  27. string docker_id = 2;
  28. string thread_id = 3;
  29. uint64 sdk_ts = 4;
  30. uint64 packet_id = 5;
  31. }
  32. message AuditMessageBody {
  33. uint64 log_ts = 1;
  34. string inlong_group_id = 2;
  35. string inlong_stream_id = 3;
  36. string audit_id = 4;
  37. uint64 count = 5;
  38. uint64 size = 6;
  39. int64 delay = 7;
  40. string audit_tag = 8;
  41. uint64 audit_version = 9;
  42. }
  43. message AuditReply {
  44. enum RSP_CODE {
  45. SUCCESS = 0;
  46. FAILED = 1;
  47. DISASTER = 2;
  48. }
  49. uint64 request_id = 1;
  50. RSP_CODE rsp_code = 2;
  51. string message = 3;
  52. }

Audit Header

审计数据的头部包含以下机器元数据信息:机器 IP、容器 ID、线程 ID、当前机器时间以及数据包 ID。以下是具体的协议描述:

  1. message AuditMessageHeader {
  2. string ip = 1;
  3. string docker_id = 2;
  4. string thread_id = 3;
  5. uint64 sdk_ts = 4;
  6. uint64 packet_id = 5;
  7. }
  • IP:记录生成或处理数据的机器的 IP 地址;
  • Docker ID:标识数据所属的容器;
  • Thread ID:标识生成或处理数据的线程;
  • SdkTs:记录 SDK 上报数据的机器时间戳;
  • Packet ID:用于标识每个数据包。

这些机器元数据信息的用途在于实现审计数据的去重以及运营监控。通过记录和分析这些信息,可以确保数据的唯一性,同时有助于进行运营监控和故障排查等操作。

Audit Body

审计数据的主体包含以下信息:数据时间、InLong GroupId、InLong StreamId、AuditId、AuditTag、Audit版本、条数、大小以及传输时延。以下是具体的协议描述:

  1. message AuditMessageBody {
  2. uint64 log_ts = 1;
  3. string inlong_group_id = 2;
  4. string inlong_stream_id = 3;
  5. string audit_id = 4;
  6. uint64 count = 5;
  7. uint64 size = 6;
  8. int64 delay = 7;
  9. string audit_tag = 8;
  10. uint64 audit_version = 9;
  11. }
  • LogTs:记录数据生成或处理的时间戳;
  • InLong GroupId:标识数据所属的 InLong 组;
  • InLong StreamId:标识数据所属的 InLong 流;
  • AuditId:用于唯一标识每个审计记录;
  • AuditTag:用于标记审计记录的特定标签;
  • Audit Version:记录审计记录的版本号;
  • Count:记录数据中包含的条目数量;
  • Size:记录数据的大小;
  • Delay:记录数据传输所花费的时间。

这些审计信息的目的是用于对账,以确保数据的完整性和准确性。通过对这些信息进行统计,可以实现数据验证、故障排查以及性能分析等操作。

AuditData 数据格式

Audit Proxy负责接收来自 Audit SDK 发送的 Protobuf 格式数据,并对其进行解析。解析完成后,它将 Audit Header 和Audit Body 组装成一条完整的审计数据,随后将该数据写入消息队列( MQ )。AuditData 代表组装后的数据格式,具体细节如下:

  1. {
  2. "ip": "127.0.0.1",
  3. "dockerId": "1",
  4. "threadId": "1",
  5. "sdkTs": 1727600278000,
  6. "packetId": 1,
  7. "logTs": 1727600278000,
  8. "inlongGroupId": "groupId",
  9. "inlongStreamId": "streamId",
  10. "auditId": "auditId",
  11. "auditTag": "auditTag",
  12. "count": 1,
  13. "size": 1,
  14. "delay": 1,
  15. "auditVersion": 1
  16. }

审计数据存储格式

Audit Store 从消息队列( MQ )中消费 AuditData 审计数据,对其进行协议解析后,将数据持久化至 ClickHouse、MySQL 等存储系统中。具体的存储目标 Schema 如下所示:

ClickHouse 表 Schema

  1. CREATE TABLE apache_inlong_audit.audit_data
  2. (
  3. `log_ts` DateTime COMMENT 'Log timestamp',
  4. `audit_id` String COMMENT 'Audit id',
  5. `inlong_group_id` String COMMENT 'The target inlong group id',
  6. `inlong_stream_id` String COMMENT 'The target inlong stream id',
  7. `audit_tag` String COMMENT 'Audit tag',
  8. `audit_version` Int64 DEFAULT -1 COMMENT 'Audit version',
  9. `ip` String COMMENT 'Client IP',
  10. `docker_id` String COMMENT 'Client docker id',
  11. `thread_id` String COMMENT 'Client thread id',
  12. `sdk_ts` DateTime COMMENT 'SDK timestamp',
  13. `packet_id` Int64 COMMENT 'Packet id',
  14. `count` Int64 COMMENT 'Message count',
  15. `size` Int64 COMMENT 'Message size',
  16. `delay` Int64 COMMENT 'Message delay',
  17. `update_time` DateTime COMMENT 'Update time'
  18. )
  19. ENGINE = ReplicatedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}')
  20. PARTITION BY toDate(log_ts)
  21. ORDER BY (log_ts, audit_id, inlong_group_id, inlong_stream_id, audit_tag, audit_version, ip)
  22. TTL toDateTime(log_ts) + toIntervalDay(8)
  23. SETTINGS index_granularity = 8192

如上所述,该表采用 ReplicatedMergeTree 存储引擎,以实现分布式存储和高可用性。数据将根据 log_ts 列进行分区,并按照( log_ts, audit_id, inlong_group_id, inlong_stream_id, audit_tag, audit_version, ip )的顺序进行存储和管理,以优化查询性能。

MySQL 表 Schema

  1. CREATE TABLE IF NOT EXISTS `audit_data`
  2. (
  3. `id` int(32) NOT NULL PRIMARY KEY AUTO_INCREMENT COMMENT 'Incremental primary key',
  4. `ip` varchar(32) NOT NULL DEFAULT '' COMMENT 'Client IP',
  5. `docker_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'Client docker id',
  6. `thread_id` varchar(50) NOT NULL DEFAULT '' COMMENT 'Client thread id',
  7. `sdk_ts` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'SDK timestamp',
  8. `packet_id` BIGINT NOT NULL DEFAULT '0' COMMENT 'Packet id',
  9. `log_ts` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Log timestamp',
  10. `inlong_group_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'The target inlong group id',
  11. `inlong_stream_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'The target inlong stream id',
  12. `audit_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'Audit id',
  13. `audit_tag` varchar(100) DEFAULT '' COMMENT 'Audit tag',
  14. `audit_version` BIGINT DEFAULT -1 COMMENT 'Audit version',
  15. `count` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message count',
  16. `size` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message size',
  17. `delay` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message delay count',
  18. `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Update time',
  19. INDEX group_stream_audit_id (`inlong_group_id`, `inlong_stream_id`, `audit_id`, `log_ts`)
  20. ) ENGINE = InnoDB
  21. DEFAULT CHARSET = UTF8 COMMENT ='Inlong audit data table';

如上所述,该表采用 InnoDB 存储引擎以确保数据一致性。同时,通过使用 INDEX 子句创建了名为 group_stream_audit_id 的索引,该索引涵盖了 inlong_group_id、inlong_stream_id、audit_id 和 log_ts 列。 这一索引的创建有助于提高查询效率,尤其是在基于这些列进行筛选和排序时。