[Apache Kafka]代码实例
前提:已经配置好kafka。若未安装,可以参照【Apache Kafka】安装升级指南
已在eclipse里面安装scala插件。Eclipse Kepler中在Help->Eclipse Markectplace中搜索Scalar,然后安装即可。
使用maven构建kafka测试project在eclipse中。
创建topic:在kafka的安装目录下执行bin/kafka-create-topic.sh --zookeeper 192.168.20.99:2181 --replica 1 --partition 1 --topic test
启动consumer:在kafka的安装目录下执行bin/kafka-console-consumer.sh --zookeeper 192.168.20.99:2181 --topic test --from-beginning
pom.xml文件如下:
所有kafka依赖的jar包都在com.sksamuel.kafka下面。其中kafka使用的版本是0.8.0-beta1,kafka是2.10。
[html]
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.iflytek.cpcloud.kafka</groupId>
<artifactId>kafkatest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafkatest</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.14</version>
</dependency>
<dependency>
<groupId>com.sksamuel.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.0-beta1</version>
</dependency>
</dependencies>
</project>
然后写一个kafka producer的测试程序如下:
[java]
package com.iflytek.cpcloud.kafka.kafkatest;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* Test the Kafka Producer
* @author jcsong2
*
*/
public class ProducerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zk.connect", "192.168.20.99:2181");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "192.168.20.99:9092");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (int i = 0; i < 10; i++)
producer.send(new KeyedMessage<String, String>("test", "test" + i));
}
}
在consuemr端可以看到test0到test9十行输出。
再写一个kafka consumer的测试程序如下:
[java]
package com.iflytek.cpcloud.kafka.kafkatest;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class ConsumerTest extends Thread {
private final ConsumerConnector consumer;
private final String topic;
public static void main(String[] args) {
ConsumerTest consumerThread = new ConsumerTest("test");
consumerThread.start();
}
public ConsumerTest(String topic) {
consumer = kafka.consumer.Consumer
.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", "192.168.20.99:2181");
props.put("group.id", "0");
props.put("zookeeper.session.timeout.ms", "400000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext())
System.out.println(new String(it.next().message()));
}
}
在kafka-console-producer端输入的数
补充:综合编程 , 其他综合 ,
- 更多Apache疑问解答:
- 为什么配置完php之后apache重启就启动不了
- php环境搭建 windows7下如何手动搭建php环境,Apache+php+Mysql
- 我搭建了apache和mysql还有php。我编了一个脚本
- APMServ搭建的apache+php环境问题
- 目前apache最新版本是?可以兼容php5.3.6和mysql 5.5.11.7吗
- Apache PHP Mysql 各自的功能是什么?配合在一起各自如何分工?请用形象的比喻说明下
- 怎么能把php的服务器端apache配置成https?谢谢您了。
- 靠配置apache和php的环境!愁死了!照网上说的一样做可是还不行!我的是php-5.2.17 Apache2.2....
- 我的apache2.2.19也是加载不了PHP,显示不能加载D:/php/php5apache2_2.dll
- win7 home basic家庭版 php+mysql+apache
- apache mysql mysql 配置服务器 php页面显示一片空白 测试页面访问正常 权限apache正确 能显示php文件html
- apache为什么能解释php代码
- 如何把jsp网站放到apache上
- 关于apache poi 中excel 样式的问题
- javaWeb工程提示:org.apache.jasper.JasperException: Unable to compile class for JSP: