4 批量消息样例

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。

4.1 发送批量消息

如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:

  1. String topic = "BatchTest";
  2. List<Message> messages = new ArrayList<>();
  3. messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
  4. messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
  5. messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
  6. try {
  7. producer.send(messages);
  8. } catch (Exception e) {
  9. e.printStackTrace();
  10. //处理error
  11. }

4.2 消息列表分割

复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下:

  1. public class ListSplitter implements Iterator<List<Message>> {
  2. private final int SIZE_LIMIT = 1024 * 1024 * 4;
  3. private final List<Message> messages;
  4. private int currIndex;
  5. public ListSplitter(List<Message> messages) {
  6. this.messages = messages;
  7. }
  8. @Override public boolean hasNext() {
  9. return currIndex < messages.size();
  10. }
  11. @Override public List<Message> next() {
  12. int startIndex = getStartIndex();
  13. int nextIndex = startIndex;
  14. int totalSize = 0;
  15. for (; nextIndex < messages.size(); nextIndex++) {
  16. Message message = messages.get(nextIndex);
  17. int tmpSize = calcMessageSize(message);
  18. if (tmpSize + totalSize > SIZE_LIMIT) {
  19. break;
  20. } else {
  21. totalSize += tmpSize;
  22. }
  23. }
  24. List<Message> subList = messages.subList(startIndex, nextIndex);
  25. currIndex = nextIndex;
  26. return subList;
  27. }
  28. private int getStartIndex() {
  29. Message currMessage = messages.get(currIndex);
  30. int tmpSize = calcMessageSize(currMessage);
  31. while(tmpSize > SIZE_LIMIT) {
  32. currIndex += 1;
  33. Message message = messages.get(curIndex);
  34. tmpSize = calcMessageSize(message);
  35. }
  36. return currIndex;
  37. }
  38. private int calcMessageSize(Message message) {
  39. int tmpSize = message.getTopic().length() + message.getBody().length();
  40. Map<String, String> properties = message.getProperties();
  41. for (Map.Entry<String, String> entry : properties.entrySet()) {
  42. tmpSize += entry.getKey().length() + entry.getValue().length();
  43. }
  44. tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节
  45. return tmpSize;
  46. }
  47. }
  48. //把大的消息分裂成若干个小的消息
  49. ListSplitter splitter = new ListSplitter(messages);
  50. while (splitter.hasNext()) {
  51. try {
  52. List<Message> listItem = splitter.next();
  53. producer.send(listItem);
  54. } catch (Exception e) {
  55. e.printStackTrace();
  56. //处理error
  57. }
  58. }