Producer Example

1 Producer Example

TubeMQ provides two ways to initialize session factory, TubeSingleSessionFactory and TubeMultiSessionFactory:

  • TubeSingleSessionFactory creates only one session in the lifecycle, this is very useful in streaming scenarios.
  • TubeMultiSessionFactory creates new session on every call.

1.1 TubeSingleSessionFactory

1.1.1 Send Message Synchronously

  1. ```java
  2. public final class SyncProducerExample {
  3. public static void main(String[] args) throws Throwable {
  4. final String masterHostAndPort = "localhost:8000";
  5. final TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
  6. final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
  7. final MessageProducer messageProducer = messageSessionFactory.createProducer();
  8. final String topic = "test";
  9. final String body = "This is a test message from single-session-factory!";
  10. byte[] bodyData = StringUtils.getBytesUtf8(body);
  11. messageProducer.publish(topic);
  12. Message message = new Message(topic, bodyData);
  13. MessageSentResult result = messageProducer.sendMessage(message);
  14. if (result.isSuccess()) {
  15. System.out.println("sync send message : " + message);
  16. }
  17. messageProducer.shutdown();
  18. }
  19. }
  20. ```

1.1.2 Send Message Asynchronously

  1. ```java
  2. public final class AsyncProducerExample {
  3. public static void main(String[] args) throws Throwable {
  4. final String masterHostAndPort = "localhost:8000";
  5. final TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
  6. final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
  7. final MessageProducer messageProducer = messageSessionFactory.createProducer();
  8. final String topic = "test";
  9. final String body = "async send message from single-session-factory!";
  10. byte[] bodyData = StringUtils.getBytesUtf8(body);
  11. messageProducer.publish(topic);
  12. final Message message = new Message(topic, bodyData);
  13. messageProducer.sendMessage(message, new MessageSentCallback(){
  14. @Override
  15. public void onMessageSent(MessageSentResult result) {
  16. if (result.isSuccess()) {
  17. System.out.println("async send message : " + message);
  18. } else {
  19. System.out.println("async send message failed : " + result.getErrMsg());
  20. }
  21. }
  22. @Override
  23. public void onException(Throwable e) {
  24. System.out.println("async send message error : " + e);
  25. }
  26. });
  27. messageProducer.shutdown();
  28. }
  29. }
  30. ```

1.1.3 Send Message With Attributes

  1. ```java
  2. public final class ProducerWithAttributeExample {
  3. public static void main(String[] args) throws Throwable {
  4. final String masterHostAndPort = "localhost:8000";
  5. final TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
  6. final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
  7. final MessageProducer messageProducer = messageSessionFactory.createProducer();
  8. final String topic = "test";
  9. final String body = "send message with attribute from single-session-factory!";
  10. byte[] bodyData = StringUtils.getBytesUtf8(body);
  11. messageProducer.publish(topic);
  12. Message message = new Message(topic, bodyData);
  13. //set attribute
  14. message.setAttrKeyVal("test_key", "test value");
  15. //msgType is used for consumer filtering, and msgTime(accurate to minute) is used as the pipe to send and receive statistics
  16. SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
  17. message.putSystemHeader("test", sdf.format(new Date()));
  18. messageProducer.sendMessage(message);
  19. messageProducer.shutdown();
  20. }
  21. }
  22. ```

1.2 TubeMultiSessionFactory

  1. ```java
  2. public class MultiSessionProducerExample {
  3. public static void main(String[] args) throws Throwable {
  4. final int SESSION_FACTORY_NUM = 10;
  5. final String masterHostAndPort = "localhost:8000";
  6. final TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
  7. final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>(SESSION_FACTORY_NUM);
  8. final ExecutorService sendExecutorService = Executors.newFixedThreadPool(SESSION_FACTORY_NUM);
  9. final CountDownLatch latch = new CountDownLatch(SESSION_FACTORY_NUM);
  10. for (int i = 0; i < SESSION_FACTORY_NUM; i++) {
  11. TubeMultiSessionFactory tubeMultiSessionFactory = new TubeMultiSessionFactory(clientConfig);
  12. sessionFactoryList.add(tubeMultiSessionFactory);
  13. MessageProducer producer = tubeMultiSessionFactory.createProducer();
  14. Sender sender = new Sender(producer, latch);
  15. sendExecutorService.submit(sender);
  16. }
  17. latch.await();
  18. sendExecutorService.shutdownNow();
  19. for (MessageSessionFactory sessionFactory : sessionFactoryList) {
  20. sessionFactory.shutdown();
  21. }
  22. }
  23. private static class Sender implements Runnable {
  24. private MessageProducer producer;
  25. private CountDownLatch latch;
  26. public Sender(MessageProducer producer, CountDownLatch latch) {
  27. this.producer = producer;
  28. this.latch = latch;
  29. }
  30. @Override
  31. public void run() {
  32. final String topic = "test";
  33. try {
  34. producer.publish(topic);
  35. final byte[] bodyData = StringUtils.getBytesUtf8("This is a test message from multi-session factory");
  36. Message message = new Message(topic, bodyData);
  37. producer.sendMessage(message);
  38. producer.shutdown();
  39. } catch (Throwable ex) {
  40. System.out.println("send message error : " + ex);
  41. } finally {
  42. latch.countDown();
  43. }
  44. }
  45. }
  46. }
  47. ```