Kafka

Apache KafkaKafka - 图1open in new window 是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

示例代码

kafka 生产者生产数据 Java 代码示例

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "127.0.0.1:9092");
  3. props.put("key.serializer", StringSerializer.class);
  4. props.put("value.serializer", StringSerializer.class);
  5. KafkaProducer<String, String> producer = new KafkaProducer<>(props);
  6. producer.send(
  7. new ProducerRecord<>(
  8. "Kafka-Test", "key", "root.kafka," + System.currentTimeMillis() + ",value,INT32,100"));
  9. producer.close();

kafka 消费者接收数据 Java 代码示例

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "127.0.0.1:9092");
  3. props.put("key.deserializer", StringDeserializer.class);
  4. props.put("value.deserializer", StringDeserializer.class);
  5. props.put("auto.offset.reset", "earliest");
  6. props.put("group.id", "Kafka-Test");
  7. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
  8. kafkaConsumer.subscribe(Collections.singleton("Kafka-Test"));
  9. ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));

存入 IoTDB 服务器的 Java 代码示例

  1. SessionPool pool =
  2. new SessionPool.Builder()
  3. .host("127.0.0.1")
  4. .port(6667)
  5. .user("root")
  6. .password("root")
  7. .maxSize(3)
  8. .build();
  9. List<String> datas = new ArrayList<>(records.count());
  10. for (ConsumerRecord<String, String> record : records) {
  11. datas.add(record.value());
  12. }
  13. int size = datas.size();
  14. List<String> deviceIds = new ArrayList<>(size);
  15. List<Long> times = new ArrayList<>(size);
  16. List<List<String>> measurementsList = new ArrayList<>(size);
  17. List<List<TSDataType>> typesList = new ArrayList<>(size);
  18. List<List<Object>> valuesList = new ArrayList<>(size);
  19. for (String data : datas) {
  20. String[] dataArray = data.split(",");
  21. String device = dataArray[0];
  22. long time = Long.parseLong(dataArray[1]);
  23. List<String> measurements = Arrays.asList(dataArray[2].split(":"));
  24. List<TSDataType> types = new ArrayList<>();
  25. for (String type : dataArray[3].split(":")) {
  26. types.add(TSDataType.valueOf(type));
  27. }
  28. List<Object> values = new ArrayList<>();
  29. String[] valuesStr = dataArray[4].split(":");
  30. for (int i = 0; i < valuesStr.length; i++) {
  31. switch (types.get(i)) {
  32. case INT64:
  33. values.add(Long.parseLong(valuesStr[i]));
  34. break;
  35. case DOUBLE:
  36. values.add(Double.parseDouble(valuesStr[i]));
  37. break;
  38. case INT32:
  39. values.add(Integer.parseInt(valuesStr[i]));
  40. break;
  41. case TEXT:
  42. values.add(valuesStr[i]);
  43. break;
  44. case FLOAT:
  45. values.add(Float.parseFloat(valuesStr[i]));
  46. break;
  47. case BOOLEAN:
  48. values.add(Boolean.parseBoolean(valuesStr[i]));
  49. break;
  50. }
  51. }
  52. deviceIds.add(device);
  53. times.add(time);
  54. measurementsList.add(measurements);
  55. typesList.add(types);
  56. valuesList.add(values);
  57. }
  58. pool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList);