Commit 1adf9100 authored by xiebaofa's avatar xiebaofa

阿里binlog采集

parent 4183b23b
package com.miya.ali;
import com.miya.common.SessionClient;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
public class AccountPayKafkaBinlogToHBase {
private static final List<String> RDS_LIST = Arrays.asList("rds-binlog-mbox", "rds-binlog-37",
"rds-binlog-hrhn_zx", "rds-binlog-hrhd_zx", "rds-binlog-qdm", "rds-binlog-watson_zx",
"rds-binlog-hongkong", "rds-binlog-lawson", "rds-binlog-165", "rds-binlog-aeon", "rds-binlog-metro",
"rds-binlog-yh", "rds-binlog-cdfy");
public static String createSourceSql(String rds) {
String CREATE_SOURCE_SQL_TEMPLATE = "CREATE TABLE " + rds.replace('-','_') + " (\n" +
"saasid STRING,\n" +
"marketid STRING,\n" +
"operator_id STRING,\n" +
"out_id STRING,\n" +
"paymentplatform STRING,\n" +
"serveicetype STRING,\n" +
"total_fee STRING,\n" +
"status STRING,\n" +
"trad_desc STRING,\n" +
"`date` STRING,\n" +
"systemdate STRING,\n" +
"fund_bill_list STRING,\n" +
"buyer_logon_id STRING,\n" +
"buyer_user_id STRING,\n" +
"cashier STRING,\n" +
"seller_id STRING,\n" +
"trade_no STRING,\n" +
"is_subscribe STRING,\n" +
"isbalance STRING,\n" +
"posbatch STRING,\n" +
"invoiceno STRING,\n" +
"deductionfee STRING,\n" +
"merchantdiscount STRING,\n" +
"otherdescount STRING,\n" +
"barcode STRING,\n" +
"goodstag STRING,\n" +
"rds_id STRING,\n" +
"rds_name STRING,\n" +
"mqstatus STRING,\n" +
"trade_type STRING,\n" +
"notify_url STRING,\n" +
"currency STRING,\n" +
"`day` STRING,\n" +
"settlementid STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = '"+rds+"',\n" +
" 'properties.bootstrap.servers' = '10.0.1.227:9092,10.0.1.226:9092,10.0.1.228:9092',\n" +
" 'properties.group.id' = 'flink-accountpay-binlog',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'canal-json' ,\n" +
" 'canal-json.table.include' = 'accountpay' ,\n" +
" 'canal-json.ignore-parse-errors' = 'true' \n" +
")";
return CREATE_SOURCE_SQL_TEMPLATE;
}
public static String createSinkSql(){
String CREATE_SINK_SQL = "CREATE TABLE huawei_hbase_sink_table (\n" +
" rowkey STRING,\n" +
" f ROW<saasid STRING,\n" +
" marketid STRING,\n" +
" operator_id STRING,\n" +
" out_id STRING,\n" +
" paymentplatform STRING,\n" +
" serveicetype STRING,\n" +
" total_fee STRING,\n" +
" status STRING,\n" +
" trad_desc STRING,\n" +
" `date` STRING,\n" +
" systemdate STRING,\n" +
" fund_bill_list STRING,\n" +
" buyer_logon_id STRING,\n" +
" buyer_user_id STRING,\n" +
" cashier STRING,\n" +
" seller_id STRING,\n" +
" trade_no STRING,\n" +
" is_subscribe STRING,\n" +
" isbalance STRING,\n" +
" posbatch STRING,\n" +
" invoiceno STRING,\n" +
" deductionfee STRING,\n" +
" merchantdiscount STRING,\n" +
" otherdescount STRING,\n" +
" barcode STRING,\n" +
" goodstag STRING,\n" +
" rds_id STRING,\n" +
" rds_name STRING,\n" +
" mqstatus STRING,\n" +
" trade_type STRING,\n" +
" notify_url STRING,\n" +
" currency STRING,\n" +
" settlementid STRING>,\n" +
" PRIMARY KEY (rowkey) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'hbase-2.2',\n" +
" 'table-name' = 'original:accountpay',\n" +
" 'zookeeper.quorum' = '172.16.130.64:2181,172.16.128.15:2181,172.16.129.113:2181'" +
")";
return CREATE_SINK_SQL;
}
public static String createJobSql() {
StringBuilder stringBuilder = new StringBuilder();
String CREATE_JOB_SQL = "INSERT INTO huawei_hbase_sink_table\n" +
"SELECT " +
" CONCAT( CAST( (MOD(CAST(DATE_FORMAT(systemdate,'yyyyMMdd') AS INT),10)) AS STRING ) , '~' , DATE_FORMAT(systemdate,'yyyyMMdd'),'~' , out_id) ," +
// " '324234234324234' ," +
" ROW(" +
"saasid ,\n" +
"marketid ,\n" +
"operator_id ,\n" +
"out_id ,\n" +
"paymentplatform ,\n" +
"serveicetype ,\n" +
"total_fee ,\n" +
"status ,\n" +
"trad_desc ,\n" +
"`date` ,\n" +
"systemdate ,\n" +
"fund_bill_list ,\n" +
"buyer_logon_id ,\n" +
"buyer_user_id ,\n" +
"cashier ,\n" +
"seller_id ,\n" +
"trade_no ,\n" +
"is_subscribe ,\n" +
"isbalance ,\n" +
"posbatch ,\n" +
"invoiceno ,\n" +
"deductionfee ,\n" +
"merchantdiscount ,\n" +
"otherdescount ,\n" +
"barcode ,\n" +
"goodstag ,\n" +
"rds_id ,\n" +
"rds_name ,\n" +
"mqstatus ,\n" +
"trade_type ,\n" +
"notify_url ,\n" +
"currency ,\n" +
"settlementid " +
" ) FROM ( select * from ( ";
stringBuilder.append(CREATE_JOB_SQL).append( "select * from " + RDS_LIST.get(0).replace('-','_') );
for (int i = 1; i < RDS_LIST.size(); i++) {
stringBuilder.append (" union all select * from " + RDS_LIST.get(i).replace('-','_'));
}
stringBuilder.append(" ) t )");
return stringBuilder.toString();
}
public static void main(String[] args) throws Exception {
SessionClient session = new SessionClient(new AccountPayKafkaBinlogToHBase().getClass().getName(),"123.60.47.52", 8083, "streaming");
for (String rds : RDS_LIST) {
System.out.println(session.submitStatement(createSourceSql(rds)).getResults());
}
System.out.println(session.submitStatement(createSinkSql()).getResults());
System.out.println(session.submitStatement(createJobSql()).getResults());
session.close();
}
}
package com.miya.ali;
public class KafkaBinlogToHBase {
}
package com.miya.ali;
import com.miya.common.SessionClient;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
public class ReturnPayKafkaBinlogToHBase {
private static final List<String> RDS_LIST = Arrays.asList("rds-binlog-mbox", "rds-binlog-37",
"rds-binlog-hrhn_zx", "rds-binlog-hrhd_zx", "rds-binlog-qdm", "rds-binlog-watson_zx",
"rds-binlog-hongkong", "rds-binlog-lawson", "rds-binlog-165", "rds-binlog-aeon", "rds-binlog-metro",
"rds-binlog-yh", "rds-binlog-cdfy");
public static String createSourceSql(String rds) {
String CREATE_SOURCE_SQL_TEMPLATE = "CREATE TABLE " + rds.replace('-','_') + " (\n" +
"saasid STRING ,\n" +
"marketid STRING ,\n" +
"operator_id STRING ,\n" +
"out_id STRING ,\n" +
"out_request_no STRING ,\n" +
"paymentplatform STRING ,\n" +
"serveicetype STRING ,\n" +
"total_fee STRING ,\n" +
"status STRING ,\n" +
"trad_desc STRING ,\n" +
"`date` STRING ,\n" +
"miyadate STRING ,\n" +
"retotal_fee STRING ,\n" +
"cashier STRING ,\n" +
"posbatch STRING ,\n" +
"invoiceno STRING ,\n" +
"fundbilllist STRING ,\n" +
"seller_id STRING ,\n" +
"trade_no STRING ,\n" +
"orderfee STRING ,\n" +
"rds_id STRING ,\n" +
"rds_name STRING ,\n" +
"currency STRING ,\n" +
"settlementid STRING ,\n" +
"trade_type STRING ,\n" +
"deductionfee STRING ,\n" +
"merchantdiscount STRING ,\n" +
"otherdescount STRING "+
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = '"+rds+"',\n" +
" 'properties.bootstrap.servers' = '10.0.1.227:9092,10.0.1.226:9092,10.0.1.228:9092',\n" +
" 'properties.group.id' = 'flink-returnpay-binlog',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'canal-json' ,\n" +
" 'canal-json.table.include' = 'returnpay' ,\n" +
" 'canal-json.ignore-parse-errors' = 'true' \n" +
")";
return CREATE_SOURCE_SQL_TEMPLATE;
}
public static String createSinkSql(){
String CREATE_SINK_SQL = "CREATE TABLE huawei_hbase_sink_table (\n" +
" rowkey STRING,\n" +
" f ROW<saasid STRING,\n" +
" marketid STRING ,\n" +
" operator_id STRING ,\n" +
" out_id STRING ,\n" +
" out_request_no STRING ,\n" +
" paymentplatform STRING ,\n" +
" serveicetype STRING ,\n" +
" total_fee STRING ,\n" +
" status STRING ,\n" +
" trad_desc STRING ,\n" +
" `date` STRING ,\n" +
" miyadate STRING,\n" +
" retotal_fee STRING ,\n" +
" cashier STRING ,\n" +
" posbatch STRING ,\n" +
" invoiceno STRING ,\n" +
" fundbilllist STRING ,\n" +
" seller_id STRING ,\n" +
" trade_no STRING ,\n" +
" orderfee STRING ,\n" +
" rds_id STRING ,\n" +
" rds_name STRING ,\n" +
" currency STRING ,\n" +
" settlementid STRING ,\n" +
" trade_type STRING ,\n" +
" deductionfee STRING ,\n" +
" merchantdiscount STRING ,\n" +
" otherdescount STRING >,\n" +
" PRIMARY KEY (rowkey) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'hbase-2.2',\n" +
" 'table-name' = 'original:returnpay',\n" +
" 'zookeeper.quorum' = '172.16.130.64:2181,172.16.128.15:2181,172.16.129.113:2181'" +
")";
return CREATE_SINK_SQL;
}
public static String createJobSql() {
StringBuilder stringBuilder = new StringBuilder();
String CREATE_JOB_SQL = "INSERT INTO huawei_hbase_sink_table\n" +
"SELECT " +
" CONCAT( CAST( (MOD(CAST(DATE_FORMAT(miyadate,'yyyyMMdd') AS INT),10)) AS STRING ) , '~' , DATE_FORMAT(miyadate,'yyyyMMdd'),'~' , out_id) ," +
// " CONCAT(out_id,'~',CAST( rand() as STRING)) ," +
// " UUID() ," +
" ROW(" +
"saasid ,\n" +
"marketid ,\n" +
"operator_id ,\n" +
"out_id ,\n" +
"out_request_no ,\n" +
"paymentplatform ,\n" +
"serveicetype ,\n" +
"total_fee ,\n" +
"status ,\n" +
"trad_desc ,\n" +
"`date` ,\n" +
"miyadate ,\n" +
"retotal_fee ,\n" +
"cashier ,\n" +
"posbatch ,\n" +
"invoiceno ,\n" +
"fundbilllist ,\n" +
"seller_id ,\n" +
"trade_no ,\n" +
"orderfee ,\n" +
"rds_id ,\n" +
"rds_name ,\n" +
"currency ,\n" +
"settlementid ,\n" +
"trade_type ,\n" +
"deductionfee ,\n" +
"merchantdiscount ,\n" +
"otherdescount \n" +
" ) FROM ( select * from ( ";
stringBuilder.append(CREATE_JOB_SQL).append( "select * from " + RDS_LIST.get(0).replace('-','_') );
for (int i = 1; i < RDS_LIST.size(); i++) {
stringBuilder.append (" union all select * from " + RDS_LIST.get(i).replace('-','_'));
}
stringBuilder.append(" ) t )");
return stringBuilder.toString();
}
public static void main(String[] args) throws Exception {
SessionClient session = new SessionClient(new ReturnPayKafkaBinlogToHBase().getClass().getName(),"123.60.47.52", 8083, "streaming");
for (String rds : RDS_LIST) {
System.out.println(session.submitStatement(createSourceSql(rds)).getResults());
}
System.out.println(session.submitStatement(createSinkSql()).getResults());
System.out.println(session.submitStatement(createJobSql()).getResults());
session.close();
}
}
...@@ -42,8 +42,53 @@ public class AccountPayKafkaBinlogToHBase { ...@@ -42,8 +42,53 @@ public class AccountPayKafkaBinlogToHBase {
"settlementid STRING\n" + "settlementid STRING\n" +
") WITH (\n" + ") WITH (\n" +
" 'connector' = 'kafka',\n" + " 'connector' = 'kafka',\n" +
" 'topic' = 'accountpay',\n" + " 'topic' = 'rds-binlog-test',\n" +
" 'properties.bootstrap.servers' = '10.0.1.227:9092,10.0.1.226:9092,10.0.1.228:9092',\n" + " 'properties.bootstrap.servers' = '172.16.7.171:9092,172.16.7.8:9092,172.16.7.85:9092',\n" +
" 'properties.group.id' = 'flink-accountpay-binlog',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true' ," +
" 'format' = 'json'\n" +
")";
private static final String CREATE_SOURCE_SQL0 = "CREATE TABLE huawei_kafka_binlog_source_table0 (\n" +
"saasid STRING,\n" +
"marketid STRING,\n" +
"operator_id STRING,\n" +
"out_id STRING,\n" +
"paymentplatform STRING,\n" +
"serveicetype STRING,\n" +
"total_fee STRING,\n" +
"status STRING,\n" +
"trad_desc STRING,\n" +
"`date` STRING,\n" +
"systemdate STRING,\n" +
"fund_bill_list STRING,\n" +
"buyer_logon_id STRING,\n" +
"buyer_user_id STRING,\n" +
"cashier STRING,\n" +
"seller_id STRING,\n" +
"trade_no STRING,\n" +
"is_subscribe STRING,\n" +
"isbalance STRING,\n" +
"posbatch STRING,\n" +
"invoiceno STRING,\n" +
"deductionfee STRING,\n" +
"merchantdiscount STRING,\n" +
"otherdescount STRING,\n" +
"barcode STRING,\n" +
"goodstag STRING,\n" +
"rds_id STRING,\n" +
"rds_name STRING,\n" +
"mqstatus STRING,\n" +
"trade_type STRING,\n" +
"notify_url STRING,\n" +
"currency STRING,\n" +
"settlementid STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'rds-binlog-test0',\n" +
" 'properties.bootstrap.servers' = '172.16.7.171:9092,172.16.7.8:9092,172.16.7.85:9092',\n" +
" 'properties.group.id' = 'flink-accountpay-binlog',\n" + " 'properties.group.id' = 'flink-accountpay-binlog',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" +
" 'json.fail-on-missing-field' = 'false',\n" + " 'json.fail-on-missing-field' = 'false',\n" +
...@@ -130,7 +175,10 @@ public class AccountPayKafkaBinlogToHBase { ...@@ -130,7 +175,10 @@ public class AccountPayKafkaBinlogToHBase {
"notify_url ,\n" + "notify_url ,\n" +
"currency ,\n" + "currency ,\n" +
"settlementid "+ "settlementid "+
" ) FROM huawei_kafka_binlog_source_table"; " ) FROM " +
" (select * from huawei_kafka_binlog_source_table union all " +
" select * from huawei_kafka_binlog_source_table0" +
") ";
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
...@@ -138,6 +186,7 @@ public class AccountPayKafkaBinlogToHBase { ...@@ -138,6 +186,7 @@ public class AccountPayKafkaBinlogToHBase {
System.out.println(session.submitStatement(CREATE_SOURCE_SQL).getResults()); System.out.println(session.submitStatement(CREATE_SOURCE_SQL).getResults());
System.out.println(session.submitStatement(CREATE_SOURCE_SQL0).getResults());
System.out.println(session.submitStatement(CREATE_SINK_SQL).getResults()); System.out.println(session.submitStatement(CREATE_SINK_SQL).getResults());
System.out.println(session.submitStatement(CREATE_JOB_SQL).getResults()); System.out.println(session.submitStatement(CREATE_JOB_SQL).getResults());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment