Storm 分布式RPC
发布日期:2021-08-28 04:40:45 浏览次数:1 分类:技术文章

本文共 3271 字,大约阅读时间需要 10 分钟。

hot3.png

关于这部分的基本概念,大家自行上网上查资料去,这里我直接上例子。

首先需要在storm集群上把DRPC的环境准备好,在storm.yaml当中增加如下内容

 drpc.servers:

  - "192.168.1.118"

之后通过storm drpc启动分布式RPC服务。

之后,跟其他的topology并没有什么不同,我们需要写点代码,我这边直接从storm的例子当中找了个:

public class BasicDRPCTopology {

    public static class ExclaimBolt extends BaseBasicBolt {
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String input = tuple.getString(1);
            collector.emit(new Values(tuple.getValue(0), input + "!"));
        }

        @Override

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "result"));
        }

    }

    public static void main(String[] args) throws Exception {

        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
        builder.addBolt(new ExclaimBolt(), 3);

        Config conf = new Config();

        conf.setNumWorkers(3);
        StormSubmitter.submitTopologyWithProgressBar("DRCP-TEST", conf, builder.createRemoteTopology());
    }
}

从main函数开始,简单解释一下:

首先new一个LinearDRPCTopologyBuilder对象,其中的参数【exclamation】就是我们在执行rpc调用时候的方法名。

之后我们加入一个自己的bolt,并行数量为3

之后用StormSubmitter把这个topology提交上去就行了。

代码完成之后,打一个jar包,用storm jar把topology提交到集群上。

客户端调用,非常简单

        DRPCClient client = new DRPCClient("192.168.1.118", 3772);

        String result = client.execute("exclamation", "china");
        System.out.println(result);

到此为止,一个最简单的DRPC调用的工作已经完成了。

等等,还有点问题,LinearDRPCTopologyBuilder 这个东西是不建议使用的(我这里的版本是0.9.3)。

源码上有这么一行:

Trident subsumes the functionality provided by this class, so it's deprecated

大概意思就是trident这个东西已经包含了LinearDRPCTopologyBuilder 当中的功能。

trident是什么意思?翻译了一下,【三叉戟】,靠,看起来很牛逼的样子。必须试试。

那么上第二份代码:

public class TridentDRPCTopology {

    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        StormSubmitter.submitTopologyWithProgressBar("word-count", conf, buildTopology());
    }

    public static StormTopology buildTopology() {

        TridentTopology topology = new TridentTopology();

        topology.newDRPCStream("word-count").

                each(new Fields("args"), new Split(), new Fields("word")).
                groupBy(new Fields("word")).
                aggregate(new One(), new Fields("one")).
                aggregate(new Fields("one"), new Sum(), new Fields("word-count"));
        return topology.build();
    }

    public static class Split extends BaseFunction {

        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            String sentence = tuple.getString(0);
            for (String word : sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }
    }

    public static class One implements CombinerAggregator<Integer> {

        @Override
        public Integer init(TridentTuple tuple) {
            return 1;
        }

        @Override

        public Integer combine(Integer val1, Integer val2) {
            return 1;
        }

        @Override

        public Integer zero() {
            return 1;
        }
    }
}

这个topology的功能要稍稍复杂一些,给出一句话,查一下一共有多少个词,当然了,不能重复计数。main函数当中非常简单,提交一个topology。而这个topology的构建过程是在buildTopology当中完成的。

        topology.newDRPCStream("word-count").

                each(new Fields("args"), new Split(), new Fields("word")).    //用空格分词
                groupBy(new Fields("word")).    //分组
                aggregate(new One(), new Fields("one")).    //给每组的数量设定为1
                aggregate(new Fields("one"), new Sum(), new Fields("word-count"));    //sum计算总和

这样的方式看起来跟spark当中对RDD的操作是有些像的。

好了,还是打包,提交。

然后是客户端测试:

        DRPCClient client = new DRPCClient("192.168.1.118", 3772);

        String result = client.execute("word-count", "mywife asdf asdf asdfasdfasfweqw saaa weweew");
        System.out.println(result);

 

转载于:https://my.oschina.net/dongtianxi/blog/785687

转载地址:https://blog.csdn.net/weixin_33940102/article/details/92446665 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:oracle中substr函数的用法
下一篇:100-100

发表评论

最新留言

网站不错 人气很旺了 加油
[***.192.178.218]2024年03月20日 13时41分39秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

java单元测试断言_单元测试+断言 2019-04-21
java 创建压缩包_用Java创建ZIP压缩文件 2019-04-21
java typedarray_TintTypedArray.java 2019-04-21
java字符字面量_java – 字符串字面量的行为是令人困惑的 2019-04-21
php判断数组的值是否为空,PHP判断数组是否为空的常用方法(五种方法) 2019-04-21
php 读数据库,PHP数据库 2019-04-21
PHP能不能下载报表,PHP生成Excel报表的方法 2019-04-21
php mht2html,PHP 处理 mht 文件 2019-04-21
rt2tr matlab,MATLAB机器人工具箱参考 2019-04-21
MATLAB中GUI界面弹出菜单的使用,Matlab GUIDE使用说明(Matlab GUI界面) 2019-04-21
win iis对比apache php,服务器Apache与IIS的区别 2019-04-21
怎样用xampp测试php环境变量,使用xampp配置php运行环境的方法 2019-04-21
qq互联php教程,thinkphp5怎么整合qq互联登录教程 2019-04-21
Java怎么比较4数字大小,怎么判断四个数不成比例-判断4个数值相等-数学-古残夷同学... 2019-04-21
mysql建立索引 性能测试_MySQL分区和索引性能测试 2019-04-21
数据结构java实验 刘小晶_数据结构实例解析与实验指导:Java语言描述 2019-04-21
java实现 k nn算法_java-C中的k-NN示例问题(OpenCV) 2019-04-21
java接口的理解_Java接口的理解 - rabbit_mom的个人空间 - OSCHINA - 中文开源技术交流社区... 2019-04-21
java重用名快捷键_Eclipse 最常用的 10 组快捷键,个个牛逼! 2019-04-21
java中类加载根路径_java中获取类加载路径和项目根路径的5种方法 2019-04-21