关于这部分的基本概念,大家自行上网上查资料去,这里我直接上例子。
首先需要在storm集群上把DRPC的环境准备好,在storm.yaml当中增加如下内容
drpc.servers:
- "192.168.1.118"之后通过storm drpc启动分布式RPC服务。
之后,跟其他的topology并没有什么不同,我们需要写点代码,我这边直接从storm的例子当中找了个:
public class BasicDRPCTopology {
public static class ExclaimBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String input = tuple.getString(1); collector.emit(new Values(tuple.getValue(0), input + "!")); }@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "result")); }}
public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation"); builder.addBolt(new ExclaimBolt(), 3);Config conf = new Config();
conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar("DRCP-TEST", conf, builder.createRemoteTopology()); } }从main函数开始,简单解释一下:
首先new一个LinearDRPCTopologyBuilder对象,其中的参数【exclamation】就是我们在执行rpc调用时候的方法名。
之后我们加入一个自己的bolt,并行数量为3
之后用StormSubmitter把这个topology提交上去就行了。
代码完成之后,打一个jar包,用storm jar把topology提交到集群上。
客户端调用,非常简单
DRPCClient client = new DRPCClient("192.168.1.118", 3772);
String result = client.execute("exclamation", "china"); System.out.println(result);到此为止,一个最简单的DRPC调用的工作已经完成了。
等等,还有点问题,LinearDRPCTopologyBuilder 这个东西是不建议使用的(我这里的版本是0.9.3)。
源码上有这么一行:
Trident subsumes the functionality provided by this class, so it's deprecated
大概意思就是trident这个东西已经包含了LinearDRPCTopologyBuilder 当中的功能。
trident是什么意思?翻译了一下,【三叉戟】,靠,看起来很牛逼的样子。必须试试。
那么上第二份代码:
public class TridentDRPCTopology {
public static void main(String[] args) throws Exception { Config conf = new Config(); StormSubmitter.submitTopologyWithProgressBar("word-count", conf, buildTopology()); }public static StormTopology buildTopology() {
TridentTopology topology = new TridentTopology();topology.newDRPCStream("word-count").
each(new Fields("args"), new Split(), new Fields("word")). groupBy(new Fields("word")). aggregate(new One(), new Fields("one")). aggregate(new Fields("one"), new Sum(), new Fields("word-count")); return topology.build(); }public static class Split extends BaseFunction {
@Override public void execute(TridentTuple tuple, TridentCollector collector) { String sentence = tuple.getString(0); for (String word : sentence.split(" ")) { collector.emit(new Values(word)); } } }public static class One implements CombinerAggregator<Integer> {
@Override public Integer init(TridentTuple tuple) { return 1; }@Override
public Integer combine(Integer val1, Integer val2) { return 1; }@Override
public Integer zero() { return 1; } } }这个topology的功能要稍稍复杂一些,给出一句话,查一下一共有多少个词,当然了,不能重复计数。main函数当中非常简单,提交一个topology。而这个topology的构建过程是在buildTopology当中完成的。
topology.newDRPCStream("word-count").
each(new Fields("args"), new Split(), new Fields("word")). //用空格分词 groupBy(new Fields("word")). //分组 aggregate(new One(), new Fields("one")). //给每组的数量设定为1 aggregate(new Fields("one"), new Sum(), new Fields("word-count")); //sum计算总和这样的方式看起来跟spark当中对RDD的操作是有些像的。
好了,还是打包,提交。
然后是客户端测试:
DRPCClient client = new DRPCClient("192.168.1.118", 3772);
String result = client.execute("word-count", "mywife asdf asdf asdfasdfasfweqw saaa weweew"); System.out.println(result);