生产者示例

1 Producer 示例

TubeMQ提供了两种方式来初始化 session factory: TubeSingleSessionFactory 和 TubeMultiSessionFactory。

  • TubeSingleSessionFactory 在整个生命周期只会创建一个 session
  • TubeMultiSessionFactory 每次调用都会创建一个session

1.1 TubeSingleSessionFactory

1.1.1 Send Message Synchronously

  1. public final class SyncProducerExample {
  2. public static void main(String[] args) throws Throwable {
  3. final String masterHostAndPort = "localhost:8000";
  4. final TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
  5. final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
  6. final MessageProducer messageProducer = messageSessionFactory.createProducer();
  7. final String topic = "test";
  8. final String body = "This is a test message from single-session-factory!";
  9. byte[] bodyData = StringUtils.getBytesUtf8(body);
  10. messageProducer.publish(topic);
  11. Message message = new Message(topic, bodyData);
  12. MessageSentResult result = messageProducer.sendMessage(message);
  13. if (result.isSuccess()) {
  14. System.out.println("sync send message : " + message);
  15. }
  16. messageProducer.shutdown();
  17. }
  18. }

1.1.2 Send Message Asynchronously

  1. public final class AsyncProducerExample {
  2. public static void main(String[] args) throws Throwable {
  3. final String masterHostAndPort = "localhost:8000";
  4. final TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
  5. final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
  6. final MessageProducer messageProducer = messageSessionFactory.createProducer();
  7. final String topic = "test";
  8. final String body = "async send message from single-session-factory!";
  9. byte[] bodyData = StringUtils.getBytesUtf8(body);
  10. messageProducer.publish(topic);
  11. final Message message = new Message(topic, bodyData);
  12. messageProducer.sendMessage(message, new MessageSentCallback(){
  13. @Override
  14. public void onMessageSent(MessageSentResult result) {
  15. if (result.isSuccess()) {
  16. System.out.println("async send message : " + message);
  17. } else {
  18. System.out.println("async send message failed : " + result.getErrMsg());
  19. }
  20. }
  21. @Override
  22. public void onException(Throwable e) {
  23. System.out.println("async send message error : " + e);
  24. }
  25. });
  26. messageProducer.shutdown();
  27. }

1.1.3 Send Message With Attributes

  1. public final class ProducerWithAttributeExample {
  2. public static void main(String[] args) throws Throwable {
  3. final String masterHostAndPort = "localhost:8000";
  4. final TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
  5. final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
  6. final MessageProducer messageProducer = messageSessionFactory.createProducer();
  7. final String topic = "test";
  8. final String body = "send message with attribute from single-session-factory!";
  9. byte[] bodyData = StringUtils.getBytesUtf8(body);
  10. messageProducer.publish(topic);
  11. Message message = new Message(topic, bodyData);
  12. //set attribute
  13. message.setAttrKeyVal("test_key", "test value");
  14. //msgType is used for consumer filtering, and msgTime(accurate to minute) is used as the pipe to send and receive statistics
  15. SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
  16. message.putSystemHeader("test", sdf.format(new Date()));
  17. messageProducer.sendMessage(message);
  18. messageProducer.shutdown();
  19. }

1.2 TubeMultiSessionFactory

  1. public class MultiSessionProducerExample {
  2. public static void main(String[] args) throws Throwable {
  3. final int SESSION_FACTORY_NUM = 10;
  4. final String masterHostAndPort = "localhost:8000";
  5. final TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
  6. final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>(SESSION_FACTORY_NUM);
  7. final ExecutorService sendExecutorService = Executors.newFixedThreadPool(SESSION_FACTORY_NUM);
  8. final CountDownLatch latch = new CountDownLatch(SESSION_FACTORY_NUM);
  9. for (int i = 0; i < SESSION_FACTORY_NUM; i++) {
  10. TubeMultiSessionFactory tubeMultiSessionFactory = new TubeMultiSessionFactory(clientConfig);
  11. sessionFactoryList.add(tubeMultiSessionFactory);
  12. MessageProducer producer = tubeMultiSessionFactory.createProducer();
  13. Sender sender = new Sender(producer, latch);
  14. sendExecutorService.submit(sender);
  15. }
  16. latch.await();
  17. sendExecutorService.shutdownNow();
  18. for (MessageSessionFactory sessionFactory : sessionFactoryList) {
  19. sessionFactory.shutdown();
  20. }
  21. }
  22. private static class Sender implements Runnable {
  23. private MessageProducer producer;
  24. private CountDownLatch latch;
  25. public Sender(MessageProducer producer, CountDownLatch latch) {
  26. this.producer = producer;
  27. this.latch = latch;
  28. }
  29. @Override
  30. public void run() {
  31. final String topic = "test";
  32. try {
  33. producer.publish(topic);
  34. final byte[] bodyData = StringUtils.getBytesUtf8("This is a test message from multi-session factory");
  35. Message message = new Message(topic, bodyData);
  36. producer.sendMessage(message);
  37. producer.shutdown();
  38. } catch (Throwable ex) {
  39. System.out.println("send message error : " + ex);
  40. } finally {
  41. latch.countDown();
  42. }
  43. }
  44. }
  45. }

Back to top