实时数据同步
发布日期:2022-02-14 23:02:48 浏览次数:10 分类:技术文章

本文共 7130 字,大约阅读时间需要 23 分钟。

方案比较

log解析 SQL查询
全库同步
影响生产库
同步约束条件 只能按数字型timestamp/主键id增量同步 (若直接用timestamp,需要修改源码)
  • 两个方案的
    • SQL query
  • 方案三:Flink CDC

log解析=>数据同步

  • Oracle:
  • MySQL:binlog解析

SQL查询=>数据同步

  • flume
    • 方案

      • 读取数据:DB,file, hdfs…
      • flume写入DB,hdfs
      • flume写入kafka后期再消费
    • 实施

      • 安装flume[, kafka, zookepper]
      • 下载并生成flume-ng-sql-sourcejar
        • 前提:java and maven
        • 进入flume-ng-sql-source并执行mvn package
      • copy flume-ng-sql-source-{version}.jarflumelib
        • flume
          • mkdir -p $FLUME_HOME/plugins.d/sql-source/lib
          • mkdir -p $FLUME_HOME/plugins.d/sql-source/libext
          • cp flume-ng-sql-source-0.8.jar $FLUME_HOME/plugins.d/sql-source/lib
        • cdh
          • cp/scp jar到所有节点
            cp flume-ng-sql-source-1.5.2.jar /opt/cloudera/parcels/CDH-{version}-1.cdh{version}/lib/flume-ng/lib
      • cp db driver
        • db driver
          • download
          • cdh
            • hive server/usr/share/java
            • 其它应用:/data/cloudera/parcels/{新增应用}/lib/{新增应用}/jars
        • flume: cp mysql-connector-java-5.1.35-bin.jar $FLUME_HOME/plugins.d/sql-source/libext
        • cdh: cp /path/mysql-connector-java-5.1.44-bin.jar /opt/cloudera/parcels/CDH-{version}.cdh{version}/lib/flume-ng/lib
    • 配置flumeconf文件

      • 位置
        • flume$FLUME_HOME
        • cdh/opt/cloudera/parcels/CDH-{version}.cdh{version}/etc/flume-ng/conf.empty
      • conf文件
        • MySQL按数字型id同步hdfsflume_mysql_hdfs.conf
          agentmysqlhdfsTest.channels = mysqlhdfschannelTestagentmysqlhdfsTest.sources = mysqlhdfssourceTestagentmysqlhdfsTest.sinks = mysqhdfslsinkTest###########sql source################## For each Test of the sources, the type is definedagentmysqlhdfsTest.sources.mysqlhdfssourceTest.type = org.keedio.flume.source.SQLSourceagentmysqlhdfsTest.sources.mysqlhdfssourceTest.hibernate.connection.url =  jdbc:mysql://{ip}:{port}/{db}# Hibernate Database connection propertiesagentmysqlhdfsTest.sources.mysqlhdfssourceTest.hibernate.connection.user = {username}agentmysqlhdfsTest.sources.mysqlhdfssourceTest.hibernate.connection.password = {pwd}agentmysqlhdfsTest.sources.mysqlhdfssourceTest.hibernate.connection.autocommit = trueagentmysqlhdfsTest.sources.mysqlhdfssourceTest.hibernate.dialect = org.hibernate.dialect.MySQLDialectagentmysqlhdfsTest.sources.mysqlhdfssourceTest.hibernate.connection.driver_class = com.mysql.jdbc.Driver# Query delay, each configured milisecond the query will be sent:毫秒级agentmysqlhdfsTest.sources.mysqlhdfssourceTest.run.query.delay=2000# Status file is used to save last readed row:跟踪增量的偏移量agentmysqlhdfsTest.sources.mysqlhdfssourceTest.status.file.path = /tmp/flumeagentmysqlhdfsTest.sources.mysqlhdfssourceTest.status.file.name = agentmysqlhdfsTest.sqlSource.status# Custom queryagentmysqlhdfsTest.sources.mysqlhdfssourceTest.start.from = 0agentmysqlhdfsTest.sources.mysqlhdfssourceTest.custom.query = select avgShowView, avgViewBox, boxInfo, cinemaId, cinemaName,viewInfo, date from maoyan_test WHERE id > $@$agentmysqlhdfsTest.sources.mysqlhdfssourceTest.batch.size = 6000agentmysqlhdfsTest.sources.mysqlhdfssourceTest.max.rows = 1000# agentmysqlhdfsTest.sources.mysqlhdfssourceTest.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider# agentmysqlhdfsTest.sources.mysqlhdfssourceTest.hibernate.c3p0.min_size=1# agentmysqlhdfsTest.sources.mysqlhdfssourceTest.hibernate.c3p0.max_size=10##############################agentmysqlhdfsTest.channels.mysqlhdfschannelTest.type = memoryagentmysqlhdfsTest.channels.mysqlhdfschannelTest.capacity = 10000agentmysqlhdfsTest.channels.mysqlhdfschannelTest.transactionCapacity = 10000agentmysqlhdfsTest.channels.mysqlhdfschannelTest.byteCapacityBufferPercentage = 20agentmysqlhdfsTest.channels.mysqlhdfschannelTest.byteCapacity = 1600000agentmysqlhdfsTest.sinks.mysqhdfslsinkTest.type = hdfsagentmysqlhdfsTest.sinks.mysqhdfslsinkTest.hdfs.path = hdfs://nameservice1/test/xiw_su/maoyanagentmysqlhdfsTest.sinks.mysqhdfslsinkTest.hdfs.fileType = DataStreamagentmysqlhdfsTest.sinks.mysqhdfslsinkTest.hdfs.writeFormat = TextagentmysqlhdfsTest.sinks.mysqhdfslsinkTest.hdfs.rollSize = 268435456agentmysqlhdfsTest.sinks.mysqhdfslsinkTest.hdfs.rollInterval = 0agentmysqlhdfsTest.sinks.mysqhdfslsinkTest.hdfs.rollCount = 0agentmysqlhdfsTest.sinks.mysqhdfslsinkTest.channel = mysqlhdfschannelTestagentmysqlhdfsTest.sources.mysqlhdfssourceTest.channels=mysqlhdfschannelTest
        • Oracle按数字型id同步kafkaflume_oracle_test.conf
          agentoracleTest.channels = channelTestagentoracleTest.sources = sourceTestagentoracleTest.sinks = sinkTest###########sql source################## For each Test of the sources, the type is definedagentoracleTest.sources.sourceTest.type = org.keedio.flume.source.SQLSourceagentoracleTest.sources.sourceTest.hibernate.connection.url = jdbc:oracle:thin:@{ip}:{port}/{db}# Hibernate Database connection propertiesagentoracleTest.sources.sourceTest.hibernate.connection.user = {username}agentoracleTest.sources.sourceTest.hibernate.connection.password = {pwd}agentoracleTest.sources.sourceTest.hibernate.connection.autocommit = trueagentoracleTest.sources.sourceTest.hibernate.dialect = org.hibernate.dialect.Oracle10gDialectagentoracleTest.sources.sourceTest.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriveragentoracleTest.sources.sourceTest.run.query.delay=1agentoracleTest.sources.sourceTest.status.file.path = /tmp/flumeagentoracleTest.sources.sourceTest.status.file.name = agentoracleTest.sqlSource.status# Custom queryagentoracleTest.sources.sourceTest.start.from = 0
          agentoracleTest.sources.sourceTest.custom.query = SELECT so_no, cust_no, acc_name, acc_no FROM tmp_xws_test_2 WHERE id > $@$agentoracleTest.sources.sourceTest.batch.size = 6000agentoracleTest.sources.sourceTest.max.rows = 1000agentoracleTest.sources.sourceTest.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvideragentoracleTest.sources.sourceTest.hibernate.c3p0.min_size=1agentoracleTest.sources.sourceTest.hibernate.c3p0.max_size=10##############################agentoracleTest.channels.channelTest.type = memoryagentoracleTest.channels.channelTest.capacity = 10000agentoracleTest.channels.channelTest.transactionCapacity = 10000agentoracleTest.channels.channelTest.byteCapacityBufferPercentage = 20agentoracleTest.channels.channelTest.byteCapacity = 1600000agentoracleTest.sinks.sinkTest.type = org.apache.flume.sink.kafka.KafkaSinkagentoracleTest.sinks.sinkTest.topic = OracleTestTopicagentoracleTest.sinks.sinkTest.brokerList = {ip}:9092agentoracleTest.sinks.sinkTest.requiredAcks = 1agentoracleTest.sinks.sinkTest.batchSize = 20agentoracleTest.sinks.sinkTest.channel = channelTestagentoracleTest.sinks.sinkTest.channel = channelTestagentoracleTest.sources.sourceTest.channels=channelTest
    • 运行

      • to hdfsflume-ng agent --conf conf --conf-file /{path}/flume_mysql_hdfs.conf --name agentmysqlhdfsTest -Dflume.root.logger=INFO,console
      • to kafka
        • flume-ngflume-ng agent --conf conf --conf-file /{path}/flume_mysql_test.conf --name agentmysqlTest [-Dflume.root.logger=INFO,console]
        • kafka跟踪:kafka-console-consumer --zookeeper ip:2181 --topic MysqlTestTopic [--from-beginning]
      • kafka基本操作
        • 查看topic列表:kafka-topics --zookeeper ip:2181 --list
        • 手动创建topickafka-topics --zookeeper ip:2181 --create --topic {topic_name} --partitions {partitions_number} --replication-factor {replication_number}
        • 删除topickafka-topics --delete --zookeeper ip:2181 --topic {topic_name}

转载地址:https://blog.csdn.net/fish2009122/article/details/105655890 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:AirFlow之安装
下一篇:数据库概况查阅

发表评论

最新留言

留言是一种美德,欢迎回访!
[***.188.9.105]2022年09月27日 15时25分13秒