实时数据同步
发布日期:2022-02-14 23:02:48
浏览次数:25
分类:技术文章
本文共 7141 字,大约阅读时间需要 23 分钟。
方案比较
log解析 | SQL查询 | |
---|---|---|
全库同步 | 是 | 否 |
影响生产库 | 否 | 是 |
同步约束条件 | 否 | 只能按数字型timestamp /主键id 增量同步 (若直接用timestamp ,需要修改源码) |
- 两个方案的
- SQL query
- SQL query
- 方案三:Flink CDC
log解析=>数据同步
- Oracle:
- MySQL:binlog解析
SQL查询=>数据同步
-
- flume
-
方案
- 读取数据:DB,file, hdfs…
- flume写入DB,hdfs
- flume写入kafka后期再消费
-
实施
- 安装
flume
[,kafka
,zookepper
] - 下载并生成
flume-ng-sql-source
的jar
包- 前提:
java
andmaven
- 进入
flume-ng-sql-source
并执行mvn package
- 前提:
- copy
flume-ng-sql-source-{version}.jar
到flume
是lib
下flume
mkdir -p $FLUME_HOME/plugins.d/sql-source/lib
mkdir -p $FLUME_HOME/plugins.d/sql-source/libext
cp flume-ng-sql-source-0.8.jar $FLUME_HOME/plugins.d/sql-source/lib
cdh
- cp/scp jar到所有节点
cp flume-ng-sql-source-1.5.2.jar /opt/cloudera/parcels/CDH-{version}-1.cdh{version}/lib/flume-ng/lib
- cp/scp jar到所有节点
- cp db driver
- db driver
- download
cdh
:hive server
下/usr/share/java
- 其它应用:
/data/cloudera/parcels/{新增应用}/lib/{新增应用}/jars
flume
:cp mysql-connector-java-5.1.35-bin.jar $FLUME_HOME/plugins.d/sql-source/libext
cdh
:cp /path/mysql-connector-java-5.1.44-bin.jar /opt/cloudera/parcels/CDH-{version}.cdh{version}/lib/flume-ng/lib
- db driver
- 安装
-
配置
flume
的conf
文件- 位置
flume
:$FLUME_HOME
cdh
:/opt/cloudera/parcels/CDH-{version}.cdh{version}/etc/flume-ng/conf.empty
conf
文件MySQL
按数字型id
同步hdfs
:flume_mysql_hdfs.conf
agentmysqlhdfsTest.channels = mysqlhdfschannelTestagentmysqlhdfsTest.sources = mysqlhdfssourceTestagentmysqlhdfsTest.sinks = mysqhdfslsinkTest###########sql source################## For each Test of the sources, the type is definedagentmysqlhdfsTest.sources.mysqlhdfssourceTest.type = org.keedio.flume.source.SQLSourceagentmysqlhdfsTest.sources.mysqlhdfssourceTest.hibernate.connection.url = jdbc:mysql://{ip}:{port}/{db}# Hibernate Database connection propertiesagentmysqlhdfsTest.sources.mysqlhdfssourceTest.hibernate.connection.user = {username}agentmysqlhdfsTest.sources.mysqlhdfssourceTest.hibernate.connection.password = {pwd}agentmysqlhdfsTest.sources.mysqlhdfssourceTest.hibernate.connection.autocommit = trueagentmysqlhdfsTest.sources.mysqlhdfssourceTest.hibernate.dialect = org.hibernate.dialect.MySQLDialectagentmysqlhdfsTest.sources.mysqlhdfssourceTest.hibernate.connection.driver_class = com.mysql.jdbc.Driver# Query delay, each configured milisecond the query will be sent:毫秒级agentmysqlhdfsTest.sources.mysqlhdfssourceTest.run.query.delay=2000# Status file is used to save last readed row:跟踪增量的偏移量agentmysqlhdfsTest.sources.mysqlhdfssourceTest.status.file.path = /tmp/flumeagentmysqlhdfsTest.sources.mysqlhdfssourceTest.status.file.name = agentmysqlhdfsTest.sqlSource.status# Custom queryagentmysqlhdfsTest.sources.mysqlhdfssourceTest.start.from = 0agentmysqlhdfsTest.sources.mysqlhdfssourceTest.custom.query = select avgShowView, avgViewBox, boxInfo, cinemaId, cinemaName,viewInfo, date from maoyan_test WHERE id > $@$agentmysqlhdfsTest.sources.mysqlhdfssourceTest.batch.size = 6000agentmysqlhdfsTest.sources.mysqlhdfssourceTest.max.rows = 1000# agentmysqlhdfsTest.sources.mysqlhdfssourceTest.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider# agentmysqlhdfsTest.sources.mysqlhdfssourceTest.hibernate.c3p0.min_size=1# agentmysqlhdfsTest.sources.mysqlhdfssourceTest.hibernate.c3p0.max_size=10##############################agentmysqlhdfsTest.channels.mysqlhdfschannelTest.type = memoryagentmysqlhdfsTest.channels.mysqlhdfschannelTest.capacity = 10000agentmysqlhdfsTest.channels.mysqlhdfschannelTest.transactionCapacity = 10000agentmysqlhdfsTest.channels.mysqlhdfschannelTest.byteCapacityBufferPercentage = 20agentmysqlhdfsTest.channels.mysqlhdfschannelTest.byteCapacity = 1600000agentmysqlhdfsTest.sinks.mysqhdfslsinkTest.type = hdfsagentmysqlhdfsTest.sinks.mysqhdfslsinkTest.hdfs.path = hdfs://nameservice1/test/xiw_su/maoyanagentmysqlhdfsTest.sinks.mysqhdfslsinkTest.hdfs.fileType = DataStreamagentmysqlhdfsTest.sinks.mysqhdfslsinkTest.hdfs.writeFormat = TextagentmysqlhdfsTest.sinks.mysqhdfslsinkTest.hdfs.rollSize = 268435456agentmysqlhdfsTest.sinks.mysqhdfslsinkTest.hdfs.rollInterval = 0agentmysqlhdfsTest.sinks.mysqhdfslsinkTest.hdfs.rollCount = 0agentmysqlhdfsTest.sinks.mysqhdfslsinkTest.channel = mysqlhdfschannelTestagentmysqlhdfsTest.sources.mysqlhdfssourceTest.channels=mysqlhdfschannelTest
Oracle
按数字型id
同步kafka
:flume_oracle_test.conf
agentoracleTest.channels = channelTestagentoracleTest.sources = sourceTestagentoracleTest.sinks = sinkTest###########sql source################## For each Test of the sources, the type is definedagentoracleTest.sources.sourceTest.type = org.keedio.flume.source.SQLSourceagentoracleTest.sources.sourceTest.hibernate.connection.url = jdbc:oracle:thin:@{ip}:{port}/{db}# Hibernate Database connection propertiesagentoracleTest.sources.sourceTest.hibernate.connection.user = {username}agentoracleTest.sources.sourceTest.hibernate.connection.password = {pwd}agentoracleTest.sources.sourceTest.hibernate.connection.autocommit = trueagentoracleTest.sources.sourceTest.hibernate.dialect = org.hibernate.dialect.Oracle10gDialectagentoracleTest.sources.sourceTest.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriveragentoracleTest.sources.sourceTest.run.query.delay=1agentoracleTest.sources.sourceTest.status.file.path = /tmp/flumeagentoracleTest.sources.sourceTest.status.file.name = agentoracleTest.sqlSource.status# Custom queryagentoracleTest.sources.sourceTest.start.from = 0 agentoracleTest.sources.sourceTest.custom.query = SELECT so_no, cust_no, acc_name, acc_no FROM tmp_xws_test_2 WHERE id > $@$agentoracleTest.sources.sourceTest.batch.size = 6000agentoracleTest.sources.sourceTest.max.rows = 1000agentoracleTest.sources.sourceTest.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvideragentoracleTest.sources.sourceTest.hibernate.c3p0.min_size=1agentoracleTest.sources.sourceTest.hibernate.c3p0.max_size=10##############################agentoracleTest.channels.channelTest.type = memoryagentoracleTest.channels.channelTest.capacity = 10000agentoracleTest.channels.channelTest.transactionCapacity = 10000agentoracleTest.channels.channelTest.byteCapacityBufferPercentage = 20agentoracleTest.channels.channelTest.byteCapacity = 1600000agentoracleTest.sinks.sinkTest.type = org.apache.flume.sink.kafka.KafkaSinkagentoracleTest.sinks.sinkTest.topic = OracleTestTopicagentoracleTest.sinks.sinkTest.brokerList = {ip}:9092agentoracleTest.sinks.sinkTest.requiredAcks = 1agentoracleTest.sinks.sinkTest.batchSize = 20agentoracleTest.sinks.sinkTest.channel = channelTestagentoracleTest.sinks.sinkTest.channel = channelTestagentoracleTest.sources.sourceTest.channels=channelTest
- 位置
-
运行
to hdfs
:flume-ng agent --conf conf --conf-file /{path}/flume_mysql_hdfs.conf --name agentmysqlhdfsTest -Dflume.root.logger=INFO,console
to kafka
flume-ng
:flume-ng agent --conf conf --conf-file /{path}/flume_mysql_test.conf --name agentmysqlTest [-Dflume.root.logger=INFO,console]
kafka
跟踪:kafka-console-consumer --zookeeper ip:2181 --topic MysqlTestTopic [--from-beginning]
kafka
基本操作- 查看
topic
列表:kafka-topics --zookeeper ip:2181 --list
- 手动创建
topic
:kafka-topics --zookeeper ip:2181 --create --topic {topic_name} --partitions {partitions_number} --replication-factor {replication_number}
- 删除
topic
:kafka-topics --delete --zookeeper ip:2181 --topic {topic_name}
- 查看
-
转载地址:https://blog.csdn.net/fish2009122/article/details/105655890 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
路过,博主的博客真漂亮。。
[***.116.15.85]2024年04月11日 21时45分39秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
SWIFT入门 Dictionary
2019-04-26
生死6小时!!!!!!!!!!!!!!!!1
2019-04-26
段永平大佬!
2019-04-26
mysql-connector-java与Mysql、Java的对应版本
2019-04-26
MySQL 表锁、行锁、间隙锁、页锁介绍分析
2019-04-26
codeforces 789A(数学)
2019-04-26
Codeforces 796A
2019-04-26
dp46上 HDU2084
2019-04-26
dp46上 HDU1421
2019-04-26
UESTC 1324线段树
2019-04-26
POJ1651 区间dp
2019-04-26
HDU4725(spfa+双端队列优化)
2019-04-26
PowerOj 2392(树状数组 or CDQ分治)
2019-04-26
HDU 6119(区间交叉问题)
2019-04-26
hdu 6143(精妙的递推)
2019-04-26
数位dp
2019-04-26
Power oj 2540 (FFT卷积)
2019-04-26
hdu 6165(dfs or bfs or tarjan+topsort)
2019-04-26
hdu 6168(stl)
2019-04-26