awin/mod-kafka

语言: Java

git: https://github.com/awin/mod-kafka

Vert.x的Kafka模块
Kafka module for Vert.x
README.md (中文)

Green.x Kafka模块

Kafka模块允许接收由其他Vert.x Verticle发布的事件,并将这些事件发送给Kafka代理。

依赖

此模块需要Kafka服务器可用。有关Kafka设置,请参阅http://kafka.apache.org/documentation.html#quickstart。 您需要运行Zookeeper和Kafka服务器。将此模块集成到您的应用程序后,消息已发送到Kafka 使用此方法,您可以通过在控制台中创建Kafka使用者来测试结果,该控制台会在部署模块时侦听您在配置中指定的相同主题。 有关如何创建Kafka控制台消费者的更多信息,请参阅:http://kafka.apache.org/documentation.html#quickstart

name

模块名称是kafka。

组态

部署此模块时,需要提供以下配置:

{
    "address": <address>,
    "metadata.broker.list": <broker.list>,
    "kafka-topic", <kafka-topic>,
    "kafka-partition", <kafka-partition>
    "request.required.acks": <request.required.acks>,
    "serializer.class": <serializer.class>,
}

例如:

{
    "address": "test-address",
    "metadata.broker.list": "localhost:9092",
    "kafka-topic", "test-topic",
    "kafka-partition", "test-partition",
    "request.required.acks": 1,
    "serializer.class": "kafka.serializer.DefaultEncoder"
}

每个参数的详细说明:

  • 地址(必填) - Vert.x的EventBus的地址,您的应用程序已发送该事件,以便稍后由此模块使用。
  • metadata.broker.list(可选) - 以逗号分隔的Kafka代理列表。格式为“host1:port1,host2:port2”。默认值为:localhost:9092
  • kafka-topic(可选) - 要发送Kafka消息的主题的名称。默认为:test-topic
  • kafka-partition(可选) - 发送Kafka消息的特定分区的名称。默认为:test-partition
  • request.required.acks(optional) - 显示Kafka生产者是否需要等到Kafka经纪人收到消息的属性。可能的值为:0,表示生产者永远不会等待来自经纪人的确认;                                               1,这意味着生产者在领导者副本收到数据后得到确认;                                              -1,表示生成器在所有同步副本已收到数据后获得确认。默认值为:1
  • serializer.class(可选) - 消息的序列化程序类。选项是kafka.serializer.DefaultEncoder和kafka.serializer.StringEncoder。 kafka.serializer.DefaultEncoder是默认选项。

可选的StatsD配置

如果您想使用StatsD捕获计时信息,可以启用可选的statsd集成。这将使用优秀的非阻塞java-statsd-client

{
    "statsd.enabled": <statsd.enabled defaut:false>,
    "statsd.host": <statsd.host default: "localhost">,
    "statsd.port": <statsd.port default: 8125>,
    "statsd.prefix": <statsd.prefix default: "vertx.kafka">
}

例如:

{
    "statsd.enabled": true,
    "statsd.host": "localhost",
    "statsd.port": 8125,
    "statsd.prefix": "myapp.prefix"
}

每个参数的详细说明:

  • statsd.enabled(可选) - 布尔字符串,指示是否启用了statds日志记录。默认值为:false
  • statsd.host(可选) - statsd服务器的主机名。默认值为:localhost
  • statsd.post(可选) - statsd服务器的端口。默认值为:8125
  • statsd.prefix(可选) - statsd日志消息的前缀。默认值为:vertx.kafka

用法

您可以在本地测试此模块,只需将其部署在应用程序中,指定必要的配置。 确保在端口9092上本地运行Kafka服务器(请参阅http://kafka.apache.org/documentation.html#quickstart)

  1. cd kafka- [版本]
  2. bin / zookeeper-server-start.sh config / zookeeper.properties
  3. bin / kafka-server-start.sh config / server.properties

然后在您的应用程序中部署mod-kafka模块,如下所示: 例:

        JsonObject config = new JsonObject();
        config.putString("address", "test-address");
        config.putString("metadata.broker.list", "localhost:9092");
        config.putString("kafka-topic", "test-topic");
        config.putString("kafka-partition", "test-partition");
        config.putString("request.required.acks", "1");
        config.putString("serializer.class", "kafka.serializer.StringEncoder");

        container.deployModule("com.zanox~mod-kafka~1.0.0", config);

您可以使用Vert.x的JsonObject格式从应用程序发送消息,其中密钥必须是“payload”字符串,并且值可以是字节数组或字符串。请参阅下面的更多细节:

对于字节数组类型

JsonObject jsonObject = new JsonObject();
jsonObject.putBinary("payload", "your message goes here".getBytes());

对于String类型

JsonObject jsonObject = new JsonObject();
jsonObject.putString("payload", "your message goes here");

对于此用例,您需要在配置中显式指定serializer.class以具有值“kafka.serializer.StringEncoder”。

您还可以为每条消息明确指定主题:

JsonObject jsonObject = new JsonObject();
jsonObject.putString("payload", "your message goes here");
jsonObject.putString("topic", "your_topic");

然后,您可以通过控制台创建使用者来验证您是否在Kafka服务器中收到这些消息:

  1. cd kafka- [版本]
  2. bin / kafka-console-consumer.sh --zookeeper localhost:2181 - topic test - from-beginning

现在您将看到正在消耗的消息。

执照

版权所有2013,ZANOX AG在Apache许可下。见LICENSE

作者:Mariam Hakobyan

特约

  1. 在Github上分割存储库
  2. 创建命名要素分支
  3. 在分支机构中开发您的更改
  4. 为您的更改编写测试(如果适用)
  5. 确保所有测试都通过
  6. 使用Github提交Pull请求

本文使用googletrans自动翻译,仅供参考, 原文来自github.com

en_README.md

Vert.x Kafka Module

Kafka module allows to receive events published by other Vert.x verticles and send those events to Kafka broker.

Dependencies

This module requires a Kafka server to be available. See http://kafka.apache.org/documentation.html#quickstart for Kafka setup.
You need to have Zookeeper & Kafka servers running. After you have this module integrated into your application, and message has been sent to Kafka
using this, you may test the results by creating Kafka consumer in console, which listens to the same topic which you specified in your configuration while deploying the module.
For more information how to create Kafka console consumer see: http://kafka.apache.org/documentation.html#quickstart

Name

The module name is kafka.

Configuration

When deploying this module, you need to provide the following configuration:

{
    "address": <address>,
    "metadata.broker.list": <broker.list>,
    "kafka-topic", <kafka-topic>,
    "kafka-partition", <kafka-partition>
    "request.required.acks": <request.required.acks>,
    "serializer.class": <serializer.class>,
}

For example:

{
    "address": "test-address",
    "metadata.broker.list": "localhost:9092",
    "kafka-topic", "test-topic",
    "kafka-partition", "test-partition",
    "request.required.acks": 1,
    "serializer.class": "kafka.serializer.DefaultEncoder"
}

The detailed description of each parameter:

  • address (mandatory) - The address of Vert.x's EventBus, where the event has been sent by your application in order to be consumed by this module later on.
  • metadata.broker.list (optional) - A comma separated list of Kafka brokers. The format is "host1:port1,host2:port2". Default is: localhost:9092
  • kafka-topic (optional) - The name of the topic where you want to send Kafka message. Default is: test-topic
  • kafka-partition (optional) - The name of specific partition where to send the Kakfa message. Default is: test-partition
  • request.required.acks (optional) - Property to show if Kafka producer needs to wait until the message has been received by Kafka broker. Possible values are: 0, which means that the producer never waits for an acknowledgement from the broker;
    1, which means that the producer gets an acknowledgement after the leader replica has received the data;
    -1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. Default is: 1
  • serializer.class (optional) - The serializer class for messages. Options are kafka.serializer.DefaultEncoder and kafka.serializer.StringEncoder. The kafka.serializer.DefaultEncoder is the default option.

Optional StatsD Configuration

If you would like to capture timing information using StatsD you can enable the optional statsd integration. This will make use of the excellent non-blocking java-statsd-client

{
    "statsd.enabled": <statsd.enabled defaut:false>,
    "statsd.host": <statsd.host default: "localhost">,
    "statsd.port": <statsd.port default: 8125>,
    "statsd.prefix": <statsd.prefix default: "vertx.kafka">
}

For example:

{
    "statsd.enabled": true,
    "statsd.host": "localhost",
    "statsd.port": 8125,
    "statsd.prefix": "myapp.prefix"
}

The detailed description of each parameter:

  • statsd.enabled (optional) - Boolean string indicating whether statds logging is enabled. Default is: false
  • statsd.host (optional) - Hostname of the statsd server. Default is: localhost
  • statsd.post (optional) - Port for the statsd server. Default is: 8125
  • statsd.prefix (optional) - Prefix for statsd log messages. Default is: vertx.kafka

Usage

You can test this module locally, just deploy it in your application specifying necessary configuration.
Make sure you have Kafka server running locally on port 9092 (see http://kafka.apache.org/documentation.html#quickstart)

  1. cd kafka-[version]
  2. bin/zookeeper-server-start.sh config/zookeeper.properties
  3. bin/kafka-server-start.sh config/server.properties

Then deploy mod-kafka module in your application like specified below:
Example:

        JsonObject config = new JsonObject();
        config.putString("address", "test-address");
        config.putString("metadata.broker.list", "localhost:9092");
        config.putString("kafka-topic", "test-topic");
        config.putString("kafka-partition", "test-partition");
        config.putString("request.required.acks", "1");
        config.putString("serializer.class", "kafka.serializer.StringEncoder");

        container.deployModule("com.zanox~mod-kafka~1.0.0", config);

You can send messages from your application in Vert.x's JsonObject format, where the key must be "payload" string, and the value can be either byte array or string. See below for more details:

For Byte Array type

JsonObject jsonObject = new JsonObject();
jsonObject.putBinary("payload", "your message goes here".getBytes());

For String type

JsonObject jsonObject = new JsonObject();
jsonObject.putString("payload", "your message goes here");

For this use case you need to explicitly specify the serializer.class in configuration to have the value "kafka.serializer.StringEncoder".

You can also explicitly specify topic for each message:

JsonObject jsonObject = new JsonObject();
jsonObject.putString("payload", "your message goes here");
jsonObject.putString("topic", "your_topic");

Then you can verify that you receive those messages in Kafka server by creating consumer via console:

  1. cd kafka-[version]
  2. bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

Now you will see the messages being consumed.

License

Copyright 2013, ZANOX AG under Apache License. See LICENSE

Author: Mariam Hakobyan

Contributing

  1. Fork the repository on Github
  2. Create a named feature branch
  3. Develop your changes in a branch
  4. Write tests for your change (if applicable)
  5. Ensure all the tests are passing
  6. Submit a Pull Request using Github