Commit 040692f5 authored by xiebaofa's avatar xiebaofa

SLS MIyaPayLog 任务

parent 7d01984c
......@@ -14,7 +14,7 @@ public class AccountPayKafkaBinlogToHBase {
public static String createSourceSql(String rds) {
String CREATE_SOURCE_SQL_TEMPLATE = "CREATE TABLE " + rds.replace('-','_') + " (\n" +
String CREATE_SOURCE_SQL_TEMPLATE = "CREATE TABLE " + rds.replace('-','_') + "_accountpay (\n" +
"saasid STRING,\n" +
"marketid STRING,\n" +
"operator_id STRING,\n" +
......@@ -53,8 +53,8 @@ public class AccountPayKafkaBinlogToHBase {
" 'connector' = 'kafka',\n" +
" 'topic' = '"+rds+"',\n" +
" 'properties.bootstrap.servers' = '10.0.1.227:9092,10.0.1.226:9092,10.0.1.228:9092',\n" +
" 'properties.group.id' = 'flink-accountpay-binlog',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'properties.group.id' = 'flink-accountpay-binlog-co',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'canal-json' ,\n" +
" 'canal-json.table.include' = 'accountpay' ,\n" +
" 'canal-json.ignore-parse-errors' = 'true' \n" +
......@@ -65,7 +65,7 @@ public class AccountPayKafkaBinlogToHBase {
public static String createSinkSql(){
String CREATE_SINK_SQL = "CREATE TABLE huawei_hbase_sink_table (\n" +
String CREATE_SINK_SQL = "CREATE TABLE hbase_sink_table_accountpay (\n" +
" rowkey STRING,\n" +
" f ROW<saasid STRING,\n" +
" marketid STRING,\n" +
......@@ -113,9 +113,9 @@ public class AccountPayKafkaBinlogToHBase {
public static String createJobSql() {
StringBuilder stringBuilder = new StringBuilder();
String CREATE_JOB_SQL = "INSERT INTO huawei_hbase_sink_table\n" +
String CREATE_JOB_SQL = "INSERT INTO hbase_sink_table_accountpay\n" +
"SELECT " +
" CONCAT( CAST( (MOD(CAST(DATE_FORMAT(systemdate,'yyyyMMdd') AS INT),10)) AS STRING ) , '~' , DATE_FORMAT(systemdate,'yyyyMMdd'),'~' , out_id) ," +
" CONCAT( CAST( (MOD(CAST(DATE_FORMAT(systemdate,'yyyyMMdd') AS INT),10)) AS STRING ) , '~' , DATE_FORMAT(systemdate,'yyyyMMdd'),'~' , out_id , '~' ,saasid) ," +
" ROW(" +
" saasid ,\n" +
" marketid ,\n" +
......@@ -152,9 +152,9 @@ public class AccountPayKafkaBinlogToHBase {
" settlementid " +
" ) FROM ( select * from ( ";
stringBuilder.append(CREATE_JOB_SQL).append( "select * from " + RDS_LIST.get(0).replace('-','_') );
stringBuilder.append(CREATE_JOB_SQL).append( "select * from " + RDS_LIST.get(0).replace('-','_')+"_accountpay" );
for (int i = 1; i < RDS_LIST.size(); i++) {
stringBuilder.append (" union all select * from " + RDS_LIST.get(i).replace('-','_'));
stringBuilder.append (" union all select * from " + RDS_LIST.get(i).replace('-','_')+"_accountpay");
}
stringBuilder.append(" ) t )");
return stringBuilder.toString();
......
package com.miya.ali;
public class KafkaPayLogsToHBase {
}
package com.miya.ali;
import com.miya.Demo;
import com.miya.common.util.SessionClient;
import java.sql.SQLException;
public class MiyaPayLogSLSToHBase {
private static final String CREATE_SOURCE_SQL =
"CREATE TABLE source_aliyun_sls_miyapay_log (\n" +
" __source__ STRING,\n" +
" __hostname__ STRING,\n" +
" __path__ STRING,\n" +
" __receive_time__ STRING,\n" +
" consume_time STRING,\n" +
" `ip` STRING,\n" +
" market_code STRING,\n" +
" merchant_code STRING,\n" +
" `message` STRING,\n" +
" message_type STRING,\n" +
" pay_type STRING,\n" +
" paymentplatform STRING,\n" +
" `time` STRING,\n" +
" trade_no STRING,\n" +
" trade_step STRING,\n" +
" trade_type STRING,\n" +
" uuid_string STRING\n" +
") WITH (\n" +
"'connector' = 'kafka',\n" +
"'topic' = 'sls-miyapay-log',\n" +
"'properties.bootstrap.servers' = '10.0.1.227:9092,10.0.1.226:9092,10.0.1.228:9092',\n" +
"'properties.group.id' = 'testGroup',\n" +
"'scan.startup.mode' = 'latest-offset','format' = 'json'\n" +
")\n";
private static final String CREATE_SINK_SQL = "CREATE TABLE sink_huawei_hbase_sls_miyapay_log (\n" +
" rowkey STRING,\n" +
" f ROW<source_ip STRING,\n" +
" hostname STRING,\n" +
" path STRING,\n" +
" receive_time STRING,\n" +
" consume_time STRING,\n" +
" ip STRING,\n" +
" market_code STRING,\n" +
" merchant_code STRING,\n" +
" message STRING,\n" +
" message_type STRING,\n" +
" pay_type STRING,\n" +
" paymentplatform STRING,\n" +
" trade_time STRING,\n" +
" trade_no STRING,\n" +
" trade_step STRING,\n" +
" trade_type STRING,\n" +
" uuid_string STRING,\n" +
" pt_day STRING>,\n" +
" PRIMARY KEY (rowkey) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'hbase-2.2',\n" +
" 'table-name' = 'original:sls_miyapay_log',\n" +
" 'zookeeper.quorum' = '172.16.130.64:2181,172.16.128.15:2181,172.16.129.113:2181'" +
")";
private static final String CREATE_JOB_SQL = "insert into sink_huawei_hbase_sls_miyapay_log\n" +
"SELECT\n" +
" CONCAT( CAST( (MOD(CAST(DATE_FORMAT(trade_time,'yyyyMMdd') AS INT),10)) AS STRING ) , '~' , DATE_FORMAT(trade_time,'yyyyMMdd'),'~' , uuid_string) as rowkey,\n" +
" ROW(\n" +
" source_ip,\n" +
" hostname,\n" +
" path,\n" +
" receive_time,\n" +
" consume_time,\n" +
" ip,\n" +
" market_code,\n" +
" merchant_code,\n" +
" message,\n" +
" message_type,\n" +
" pay_type,\n" +
" paymentplatform,\n" +
" trade_time,\n" +
" trade_no,\n" +
" trade_step,\n" +
" trade_type,\n" +
" uuid_string,\n" +
" pt_day) as f\n" +
"from (SELECT \n" +
" __source__ AS source_ip,\n" +
" __hostname__ AS hostname,\n" +
" __path__ AS path,\n" +
" __receive_time__ AS receive_time,\n" +
" consume_time,\n" +
" `ip`,\n" +
" market_code,\n" +
" merchant_code,\n" +
" `message`,\n" +
" message_type,\n" +
" pay_type,\n" +
" paymentplatform,\n" +
" `time` AS trade_time,\n" +
" trade_no,\n" +
" trade_step,\n" +
" trade_type,\n" +
" uuid_string,\n" +
" DATE_FORMAT(`time`, 'yyyyMMdd') AS pt_day\n" +
"FROM\n" +
" source_aliyun_sls_miyapay_log\n" +
"WHERE\n" +
" trade_no <> 'null')\n";
public static void main(String[] args) throws Exception {
SessionClient session = new SessionClient(new Demo().getClass().getName(),"123.60.47.52", 8083, "streaming");
kafka2kafka(session);
session.close();
}
private static void kafka2kafka(SessionClient session) throws SQLException {
System.out.println(session.submitStatement(CREATE_SOURCE_SQL).getResults());
System.out.println(session.submitStatement(CREATE_SINK_SQL).getResults());
System.out.println(session.submitStatement(CREATE_JOB_SQL).getResults());
}
}
......@@ -2,8 +2,7 @@ package com.miya.ali;
import com.miya.common.util.SessionClient;
import java.util.Arrays;
import java.util.List;
import java.util.*;
public class ReturnPayKafkaBinlogToHBase {
......@@ -13,9 +12,21 @@ public class ReturnPayKafkaBinlogToHBase {
"rds-binlog-hongkong", "rds-binlog-lawson", "rds-binlog-165", "rds-binlog-aeon", "rds-binlog-metro",
"rds-binlog-yh", "rds-binlog-cdfy");
private static final Map<String, List<String>> RDS_MAP = new HashMap<String, List<String>>() {
{
put("rds-binlog-cdfy", Arrays.asList("5","chengda"));
put("rds-binlog-yh", Arrays.asList("8","yh"));
put("rds-binlog-165", Arrays.asList("10","165"));
put("rds-binlog-metro", Arrays.asList("12","mdl"));
put("rds-binlog-aeon", Arrays.asList("14","yw"));
put("rds-binlog-hrhd_zx", Arrays.asList("20","huarun"));
put("rds-binlog-hrhn_zx", Arrays.asList("21","huarun"));
}
};
public static String createSourceSql(String rds) {
String CREATE_SOURCE_SQL_TEMPLATE = "CREATE TABLE " + rds.replace('-','_') + " (\n" +
String CREATE_SOURCE_SQL_TEMPLATE = "CREATE TABLE " + rds.replace('-','_') + "_returnpay (\n" +
"saasid STRING ,\n" +
"marketid STRING ,\n" +
"operator_id STRING ,\n" +
......@@ -48,8 +59,8 @@ public class ReturnPayKafkaBinlogToHBase {
" 'connector' = 'kafka',\n" +
" 'topic' = '"+rds+"',\n" +
" 'properties.bootstrap.servers' = '10.0.1.227:9092,10.0.1.226:9092,10.0.1.228:9092',\n" +
" 'properties.group.id' = 'flink-returnpay-binlog',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'properties.group.id' = 'flink-returnpay-binlog-co',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'canal-json' ,\n" +
" 'canal-json.table.include' = 'returnpay' ,\n" +
" 'canal-json.ignore-parse-errors' = 'true' \n" +
......@@ -60,7 +71,7 @@ public class ReturnPayKafkaBinlogToHBase {
public static String createSinkSql(){
String CREATE_SINK_SQL = "CREATE TABLE huawei_hbase_sink_table (\n" +
String CREATE_SINK_SQL = "CREATE TABLE hbase_sink_table_returnpay (\n" +
" rowkey STRING,\n" +
" f ROW<" +
" saasid STRING,\n" +
......@@ -104,9 +115,9 @@ public class ReturnPayKafkaBinlogToHBase {
public static String createJobSql() {
StringBuilder stringBuilder = new StringBuilder();
String CREATE_JOB_SQL = "INSERT INTO huawei_hbase_sink_table\n" +
String CREATE_JOB_SQL = "INSERT INTO hbase_sink_table_returnpay\n" +
"SELECT " +
" CONCAT( CAST( (MOD(CAST(DATE_FORMAT(miyadate,'yyyyMMdd') AS INT),10)) AS STRING ) , '~' , DATE_FORMAT(miyadate,'yyyyMMdd'),'~' , out_id) ," +
" CONCAT( CAST( (MOD(CAST(DATE_FORMAT(miyadate,'yyyyMMdd') AS INT),10)) AS STRING ) , '~' , DATE_FORMAT(miyadate,'yyyyMMdd'),'~' , out_request_no,'~' , saasid) ," +
" ROW(" +
"saasid ,\n" +
"marketid ,\n" +
......@@ -138,9 +149,9 @@ public class ReturnPayKafkaBinlogToHBase {
"otherdescount \n" +
" ) FROM ( select * from ( ";
stringBuilder.append(CREATE_JOB_SQL).append( "select * from " + RDS_LIST.get(0).replace('-','_') );
stringBuilder.append(CREATE_JOB_SQL).append( "select * from " + RDS_LIST.get(0).replace('-','_')+"_returnpay" );
for (int i = 1; i < RDS_LIST.size(); i++) {
stringBuilder.append (" union all select * from " + RDS_LIST.get(i).replace('-','_'));
stringBuilder.append (" union all select * from " + RDS_LIST.get(i).replace('-','_')+"_returnpay");
}
stringBuilder.append(" ) t )");
return stringBuilder.toString();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment