Commit 0b18d42d authored by xiebaofa's avatar xiebaofa

第一次

parents
/*.iml
/target/
/.idea/
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.miya</groupId>
<artifactId>flink-jdbc</artifactId>
<version>1.0-SNAPSHOT</version>
<name>flink-jdbc</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.flink</groupId>
<artifactId>flink-jdbc-driver</artifactId>
<version>0.1-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Main-Class>com.miya.App</Main-Class>
<X-Compile-Source-JDK>${maven.compiler.source}</X-Compile-Source-JDK>
<X-Compile-Target-JDK>${maven.compiler.target}</X-Compile-Target-JDK>
</manifestEntries>
</transformer>
</transformers>
<relocations combine.self="override">
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.miya;
import com.miya.common.SessionClient;
import java.sql.SQLException;
/**
* Hello world!
*
*/
public class App {
private static final String CREATE_SOURCE_SQL = "CREATE TABLE source_table_4 (\n" +
" `mysql_type` STRING,\n" +
" `ts` TIMESTAMP(3) METADATA FROM 'timestamp'\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'rds-binlog-test',\n" +
" 'properties.bootstrap.servers' = '172.16.7.171:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'json'\n" +
")";
private static final String CREATE_SINK_SQL = "CREATE TABLE sink_table_4 (\n" +
" `mysql_type` STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'rds-binlog-test2',\n" +
" 'properties.bootstrap.servers' = '172.16.7.171:9092',\n" +
" 'format' = 'csv'\n" +
")";
private static final String CREATE_PRINT_SQL = "CREATE TABLE print_table3(`mysql_type` STRING) " +
"WITH ('connector'='filesystem', \n" +
" 'path'='file:///tmp/abc', \n" +
" 'format'='csv' )";
private static final String CREATE_HBASE_SQL = "CREATE TABLE hTable (\n" +
" rowkey INT,\n" +
" family1 ROW<q1 STRING>,\n" +
" PRIMARY KEY (rowkey) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'hbase-2.2',\n" +
" 'table-name' = 'mytable',\n" +
" 'zookeeper.quorum' = '172.16.128.133:2181'\n" +
")" ;
private static final String CREATE_HBASE_SQL2 = "CREATE TABLE hTable2 (\n" +
" rowkey INT,\n" +
" family1 ROW<q1 STRING>,\n" +
" PRIMARY KEY (rowkey) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'hbase-2.2',\n" +
" 'table-name' = 'mytable2',\n" +
" 'zookeeper.quorum' = '172.16.128.3:2181',\n" +
" 'zookeeper.znode.parent' = '/hbase'\n" +
")" ;
private static final String SHOW_TABLES_SQL = "show tables";
private static final String CREATE_JOB_SQL = "insert into sink_table_4(mysql_type) select mysql_type from source_table_4";
private static final String CREATE_JOB_SQL2 = "INSERT INTO hTable\n" +
"SELECT CAST(mysql_type as INT),ROW(mysql_type) FROM source_table_4";
private static final String CREATE_JOB_SQL3 = "INSERT INTO hTable2\n" +
"SELECT CAST(mysql_type as INT),ROW(mysql_type) FROM source_table_4";
// private static final String CREATE_JOB_SQL = " select mysql_type from source_table_4";
public static void main(String[] args) throws Exception {
SessionClient session = new SessionClient(new App().getClass().getName(),"123.60.47.52", 8083, "streaming");
kafka2kafka(session);
session.close();
}
private static void kafka2hbase(SessionClient session) throws SQLException {
System.out.println(session.submitStatement(CREATE_SOURCE_SQL).getResults());
System.out.println(session.submitStatement(CREATE_HBASE_SQL2).getResults());
System.out.println(session.submitStatement(CREATE_JOB_SQL3).getResults());
}
private static void kafka2kafka(SessionClient session) throws SQLException {
System.out.println(session.submitStatement(CREATE_SOURCE_SQL).getResults());
System.out.println(session.submitStatement(CREATE_SINK_SQL).getResults());
System.out.println(session.submitStatement(CREATE_JOB_SQL).getResults());
}
}
\ No newline at end of file
package com.miya.ali;
public class KafkaBinlogToHBase {
}
package com.miya.ali;
public class KafkaPayLogsToHBase {
}
package com.miya.common;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.ververica.flink.table.gateway.rest.result.ConstantNames;
import com.ververica.flink.table.gateway.rest.result.ResultSet;
import org.apache.flink.api.common.JobID;
import org.apache.flink.types.Either;
/**
* Utility class to handle REST data structures.
*/
public class RestUtils {
public static JobID getJobID(ResultSet resultSet) {
if (resultSet.getColumns().size() != 1) {
throw new IllegalArgumentException("Should contain only one column. This is a bug.");
}
if (resultSet.getColumns().get(0).getName().equals(ConstantNames.JOB_ID)) {
String jobId = (String) resultSet.getData().get(0).getField(0);
return JobID.fromHexString(jobId);
} else {
throw new IllegalArgumentException("Column name should be " + ConstantNames.JOB_ID + ". This is a bug.");
}
}
public static Either<JobID, ResultSet> getEitherJobIdOrResultSet(ResultSet resultSet) {
if (resultSet.getColumns().size() == 1 && resultSet.getColumns().get(0).getName()
.equals(ConstantNames.JOB_ID)) {
String jobId = (String) resultSet.getData().get(0).getField(0);
return Either.Left(JobID.fromHexString(jobId));
} else {
return Either.Right(resultSet);
}
}
}
package com.miya.common;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.ververica.flink.table.gateway.rest.handler.GetInfoHeaders;
import com.ververica.flink.table.gateway.rest.handler.JobCancelHeaders;
import com.ververica.flink.table.gateway.rest.handler.ResultFetchHeaders;
import com.ververica.flink.table.gateway.rest.handler.SessionCloseHeaders;
import com.ververica.flink.table.gateway.rest.handler.SessionCreateHeaders;
import com.ververica.flink.table.gateway.rest.handler.SessionHeartbeatHeaders;
import com.ververica.flink.table.gateway.rest.handler.StatementExecuteHeaders;
import com.ververica.flink.table.gateway.rest.message.GetInfoResponseBody;
import com.ververica.flink.table.gateway.rest.message.ResultFetchMessageParameters;
import com.ververica.flink.table.gateway.rest.message.ResultFetchRequestBody;
import com.ververica.flink.table.gateway.rest.message.ResultFetchResponseBody;
import com.ververica.flink.table.gateway.rest.message.SessionCreateRequestBody;
import com.ververica.flink.table.gateway.rest.message.SessionJobMessageParameters;
import com.ververica.flink.table.gateway.rest.message.SessionMessageParameters;
import com.ververica.flink.table.gateway.rest.message.StatementExecuteRequestBody;
import com.ververica.flink.table.gateway.rest.message.StatementExecuteResponseBody;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.ExecutorUtils;
import java.sql.SQLException;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* A client to connect to Flink SQL gateway.
*/
public class SessionClient {
private final String serverHost;
private final int serverPort;
private final String sessionName;
private final String planner;
private final String executionType;
private final RestClient restClient;
private final ExecutorService executor;
private volatile String sessionId;
private volatile boolean isClosed = false;
public SessionClient(String jobName,String serverHost, int serverPort, String executionType) throws Exception {
this.serverHost = serverHost;
this.serverPort = serverPort;
this.sessionName = jobName+"-Session";
this.planner = "blink";
this.executionType = executionType;
this.executor = Executors.newFixedThreadPool(4, new ExecutorThreadFactory(jobName+"-IO"));
this.restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), executor);
connectInternal();
}
public SessionClient(
String serverHost,
int serverPort,
String sessionName,
String planner,
String executionType,
String threadName)
throws Exception {
this.serverHost = serverHost;
this.serverPort = serverPort;
this.sessionName = sessionName;
this.planner = planner;
this.executionType = executionType;
this.executor = Executors.newFixedThreadPool(4, new ExecutorThreadFactory(threadName));
this.restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), executor);
connectInternal();
}
public String getServerHost() {
return serverHost;
}
public int getServerPort() {
return serverPort;
}
public String getPlanner() {
return planner;
}
private void connectInternal() throws Exception {
this.sessionId = restClient.sendRequest(
serverHost,
serverPort,
SessionCreateHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
new SessionCreateRequestBody(sessionName, planner, executionType, Collections.emptyMap()))
.get().getSessionId();
}
public synchronized void close() throws Exception {
if (isClosed) {
return;
}
isClosed = true;
try {
restClient.sendRequest(
serverHost,
serverPort,
SessionCloseHeaders.getInstance(),
new SessionMessageParameters(sessionId),
EmptyRequestBody.getInstance()).get();
} finally {
restClient.shutdown(Time.seconds(5));
ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, executor);
}
}
public synchronized void sendHeartbeat() throws SQLException {
checkState();
try {
restClient.sendRequest(
serverHost,
serverPort,
SessionHeartbeatHeaders.getInstance(),
new SessionMessageParameters(sessionId),
EmptyRequestBody.getInstance())
.get();
} catch (Exception e) {
throw new SQLException("Failed to send heartbeat to server", e);
}
}
public StatementExecuteResponseBody submitStatement(String stmt) throws SQLException {
return submitStatement(stmt, Long.MAX_VALUE);
}
public synchronized StatementExecuteResponseBody submitStatement(String stmt, long executionTimeoutMillis)
throws SQLException {
checkState();
try {
return restClient.sendRequest(
serverHost,
serverPort,
StatementExecuteHeaders.getInstance(),
new SessionMessageParameters(sessionId),
new StatementExecuteRequestBody(stmt, executionTimeoutMillis))
.get();
} catch (Exception e) {
throw new SQLException("Failed to submit statement `" + stmt + "` to server", e);
}
}
public synchronized void cancelJob(JobID jobId) throws SQLException {
checkState();
try {
restClient.sendRequest(
serverHost,
serverPort,
JobCancelHeaders.getInstance(),
new SessionJobMessageParameters(sessionId, jobId),
EmptyRequestBody.getInstance())
.get();
} catch (Exception e) {
throw new SQLException("Failed to cancel job " + jobId.toString(), e);
}
}
public synchronized ResultFetchResponseBody fetchResult(JobID jobId, long token) throws SQLException {
return fetchResult(jobId, token, null);
}
public synchronized ResultFetchResponseBody fetchResult(
JobID jobId, long token, Integer fetchSize) throws SQLException {
checkState();
try {
return restClient.sendRequest(
serverHost,
serverPort,
ResultFetchHeaders.getInstance(),
new ResultFetchMessageParameters(sessionId, jobId, token),
new ResultFetchRequestBody(fetchSize))
.get();
} catch (Exception e) {
throw new SQLException(
"Failed to fetch result for job " + jobId.toString() +
" (token = " + token + ", fetchSize = " + fetchSize + ")",
e.getCause());
}
}
public GetInfoResponseBody getInfo() throws SQLException {
checkState();
try {
return restClient.sendRequest(
serverHost,
serverPort,
GetInfoHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
.get();
} catch (Exception e) {
throw new SQLException("Failed to get server info", e);
}
}
private void checkState() {
if (isClosed) {
throw new IllegalStateException("Session is already closed.");
}
}
}
\ No newline at end of file
package com.miya.hive;
import com.miya.App;
import com.miya.common.SessionClient;
public class OriginalProcess {
private static final String SQL1 = "use catalog myhive";
private static final String SQL3 = "INSERT into accountpay_p " +
"SELECT *,DATE_FORMAT(`date`,'yyyyMMdd') from accountpay ";
public static void main(String[] args) throws Exception {
SessionClient session = new SessionClient(new App().getClass().getName(),"123.60.47.52", 8083, "streaming");
System.out.println(session.submitStatement(SQL1).getResults());
System.out.println(session.submitStatement(SQL3).getResults());
session.close();
}
}
package com.miya.huawei;
import com.miya.common.SessionClient;
public class AccountPayKafkaBinlogToHBase {
private static final String CREATE_SOURCE_SQL = "CREATE TABLE huawei_kafka_binlog_source_table (\n" +
"saasid STRING,\n" +
"marketid STRING,\n" +
"operator_id STRING,\n" +
"out_id STRING,\n" +
"paymentplatform STRING,\n" +
"serveicetype STRING,\n" +
"total_fee STRING,\n" +
"status STRING,\n" +
"trad_desc STRING,\n" +
"`date` STRING,\n" +
"systemdate STRING,\n" +
"fund_bill_list STRING,\n" +
"buyer_logon_id STRING,\n" +
"buyer_user_id STRING,\n" +
"cashier STRING,\n" +
"seller_id STRING,\n" +
"trade_no STRING,\n" +
"is_subscribe STRING,\n" +
"isbalance STRING,\n" +
"posbatch STRING,\n" +
"invoiceno STRING,\n" +
"deductionfee STRING,\n" +
"merchantdiscount STRING,\n" +
"otherdescount STRING,\n" +
"barcode STRING,\n" +
"goodstag STRING,\n" +
"rds_id STRING,\n" +
"rds_name STRING,\n" +
"mqstatus STRING,\n" +
"trade_type STRING,\n" +
"notify_url STRING,\n" +
"currency STRING,\n" +
"settlementid STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'accountpay',\n" +
" 'properties.bootstrap.servers' = '10.0.1.227:9092,10.0.1.226:9092,10.0.1.228:9092',\n" +
" 'properties.group.id' = 'flink-accountpay-binlog',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true' ," +
" 'format' = 'json'\n" +
")";
private static final String CREATE_SINK_SQL = "CREATE TABLE huawei_hbase_sink_table (\n" +
" rowkey STRING,\n" +
" accountpay_columns ROW<saasid STRING,\n" +
" marketid STRING,\n" +
" operator_id STRING,\n" +
" out_id STRING,\n" +
" paymentplatform STRING,\n" +
" serveicetype STRING,\n" +
" total_fee STRING,\n" +
" status STRING,\n" +
" trad_desc STRING,\n" +
" `date` STRING,\n" +
" systemdate STRING,\n" +
" fund_bill_list STRING,\n" +
" buyer_logon_id STRING,\n" +
" buyer_user_id STRING,\n" +
" cashier STRING,\n" +
" seller_id STRING,\n" +
" trade_no STRING,\n" +
" is_subscribe STRING,\n" +
" isbalance STRING,\n" +
" posbatch STRING,\n" +
" invoiceno STRING,\n" +
" deductionfee STRING,\n" +
" merchantdiscount STRING,\n" +
" otherdescount STRING,\n" +
" barcode STRING,\n" +
" goodstag STRING,\n" +
" rds_id STRING,\n" +
" rds_name STRING,\n" +
" mqstatus STRING,\n" +
" trade_type STRING,\n" +
" notify_url STRING,\n" +
" currency STRING,\n" +
" settlementid STRING>,\n" +
" PRIMARY KEY (rowkey) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'hbase-2.2',\n" +
" 'table-name' = 'original:accountpay',\n" +
" 'zookeeper.quorum' = '172.16.130.64:2181,172.16.128.15:2181,172.16.129.113:2181'" +
")" ;
private static final String CREATE_JOB_SQL = "INSERT INTO huawei_hbase_sink_table\n" +
"SELECT " +
" CONCAT( CAST( (MOD(CAST(DATE_FORMAT(`date`,'yyyyMMdd') AS INT),10)) AS STRING ) , '~' , DATE_FORMAT(`date`,'yyyyMMdd'),'~' ,trade_no) ," +
" ROW(" +
"saasid ,\n" +
"marketid ,\n" +
"operator_id ,\n" +
"out_id ,\n" +
"paymentplatform ,\n" +
"serveicetype ,\n" +
"total_fee ,\n" +
"status ,\n" +
"trad_desc ,\n" +
"`date` ,\n" +
"systemdate ,\n" +
"fund_bill_list ,\n" +
"buyer_logon_id ,\n" +
"buyer_user_id ,\n" +
"cashier ,\n" +
"seller_id ,\n" +
"trade_no ,\n" +
"is_subscribe ,\n" +
"isbalance ,\n" +
"posbatch ,\n" +
"invoiceno ,\n" +
"deductionfee ,\n" +
"merchantdiscount ,\n" +
"otherdescount ,\n" +
"barcode ,\n" +
"goodstag ,\n" +
"rds_id ,\n" +
"rds_name ,\n" +
"mqstatus ,\n" +
"trade_type ,\n" +
"notify_url ,\n" +
"currency ,\n" +
"settlementid "+
" ) FROM huawei_kafka_binlog_source_table";
public static void main(String[] args) throws Exception {
SessionClient session = new SessionClient(new AccountPayKafkaBinlogToHBase().getClass().getName(),"123.60.47.52", 8083, "streaming");
System.out.println(session.submitStatement(CREATE_SOURCE_SQL).getResults());
System.out.println(session.submitStatement(CREATE_SINK_SQL).getResults());
System.out.println(session.submitStatement(CREATE_JOB_SQL).getResults());
session.close();
}
}
package com.miya.huawei;
import com.miya.common.SessionClient;
public class KafkaMiniProgramLogsToHBase {
private static final String CREATE_SOURCE_SQL = "CREATE TABLE huawei_kafka_miniprogramlogs_source_table (\n" +
" `mysql_type` STRING,\n" +
" `ts` TIMESTAMP(3) METADATA FROM 'timestamp'\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'rds-binlog-test',\n" +
" 'properties.bootstrap.servers' = '172.16.7.171:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json'\n" +
")";
private static final String CREATE_SINK_SQL = "CREATE TABLE huawei_hbase_sink_table (\n" +
" rowkey INT,\n" +
" family1 ROW<q1 STRING>,\n" +
" PRIMARY KEY (rowkey) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'hbase-2.2',\n" +
" 'table-name' = 'mytable',\n" +
" 'zookeeper.quorum' = '172.16.128.133:2181'\n" +
")" ;
private static final String CREATE_JOB_SQL = "INSERT INTO huawei_hbase_sink_table\n" +
"SELECT CAST(mysql_type as INT),ROW(mysql_type) FROM huawei_kafka_miniprogramlogs_source_table";
public static void main(String[] args) throws Exception {
SessionClient session = new SessionClient(new KafkaMiniProgramLogsToHBase().getClass().getName(),"123.60.47.52", 8083, "streaming");
System.out.println(session.submitStatement(CREATE_SOURCE_SQL).getResults());
System.out.println(session.submitStatement(CREATE_SINK_SQL).getResults());
System.out.println(session.submitStatement(CREATE_JOB_SQL).getResults());
session.close();
}
}
package com.miya.huawei;
import com.miya.common.SessionClient;
public class ReturnPayKafkaBinlogToHBase {
private static final String CREATE_SOURCE_SQL = "CREATE TABLE huawei_kafka_binlog_source_table (\n" +
"saasid STRING ,\n" +
"marketid STRING ,\n" +
"operator_id STRING ,\n" +
"out_id STRING ,\n" +
"out_request_no STRING ,\n" +
"paymentplatform STRING ,\n" +
"serveicetype STRING ,\n" +
"total_fee STRING ,\n" +
"status STRING ,\n" +
"trad_desc STRING ,\n" +
"date STRING ,\n" +
"miyadate DATETIME,\n" +
"retotal_fee STRING ,\n" +
"cashier STRING ,\n" +
"posbatch STRING ,\n" +
"invoiceno STRING ,\n" +
"fundbilllist STRING ,\n" +
"seller_id STRING ,\n" +
"trade_no STRING ,\n" +
"orderfee STRING ,\n" +
"rds_id STRING ,\n" +
"rds_name STRING ,\n" +
"currency STRING ,\n" +
"settlementid STRING ,\n" +
"trade_type STRING ,\n" +
"deductionfee STRING ,\n" +
"merchantdiscount STRING ,\n" +
"otherdescount STRING "+
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'returnpay',\n" +
" 'properties.bootstrap.servers' = '10.0.1.227:9092,10.0.1.226:9092,10.0.1.228:9092',\n" +
" 'properties.group.id' = 'flink-returnpay-binlog',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true' ," +
" 'format' = 'json'\n" +
")";
private static final String CREATE_SINK_SQL = "CREATE TABLE huawei_hbase_sink_table (\n" +
" rowkey STRING,\n" +
" returnpay_columns ROW<saasid STRING,\n" +
" marketid STRING ,\n" +
" operator_id STRING ,\n" +
" out_id STRING ,\n" +
" out_request_no STRING ,\n" +
" paymentplatform STRING ,\n" +
" serveicetype STRING ,\n" +
" total_fee STRING ,\n" +
" status STRING ,\n" +
" trad_desc STRING ,\n" +
" date STRING ,\n" +
" miyadate DATETIME,\n" +
" retotal_fee STRING ,\n" +
" cashier STRING ,\n" +
" posbatch STRING ,\n" +
" invoiceno STRING ,\n" +
" fundbilllist STRING ,\n" +
" seller_id STRING ,\n" +
" trade_no STRING ,\n" +
" orderfee STRING ,\n" +
" rds_id STRING ,\n" +
" rds_name STRING ,\n" +
" currency STRING ,\n" +
" settlementid STRING ,\n" +
" trade_type STRING ,\n" +
" deductionfee STRING ,\n" +
" merchantdiscount STRING ,\n" +
" otherdescount STRING ," +
" PRIMARY KEY (rowkey) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'hbase-2.2',\n" +
" 'table-name' = 'original:returnpay',\n" +
" 'zookeeper.quorum' = '172.16.130.64:2181,172.16.128.15:2181,172.16.129.113:2181'" +
")" ;
private static final String CREATE_JOB_SQL = "INSERT INTO huawei_hbase_sink_table\n" +
"SELECT " +
" CONCAT( CAST( (MOD(CAST(DATE_FORMAT(`date`,'yyyyMMdd') AS INT),10)) AS STRING ) , '~' , DATE_FORMAT(`date`,'yyyyMMdd'),'~' ,trade_no) ," +
" ROW(" +
"saasid ,\n" +
"marketid ,\n" +
"operator_id ,\n" +
"out_id ,\n" +
"out_request_no ,\n" +
"paymentplatform ,\n" +
"serveicetype ,\n" +
"total_fee ,\n" +
"status ,\n" +
"trad_desc ,\n" +
"date ,\n" +
"miyadate ,\n" +
"retotal_fee ,\n" +
"cashier ,\n" +
"posbatch ,\n" +
"invoiceno ,\n" +
"fundbilllist ,\n" +
"seller_id ,\n" +
"trade_no ,\n" +
"orderfee ,\n" +
"rds_id ,\n" +
"rds_name ,\n" +
"currency ,\n" +
"settlementid ,\n" +
"trade_type ,\n" +
"deductionfee ,\n" +
"merchantdiscount ,\n" +
"otherdescount \n" +
" ) FROM huawei_kafka_binlog_source_table";
public static void main(String[] args) throws Exception {
SessionClient session = new SessionClient(new ReturnPayKafkaBinlogToHBase().getClass().getName(),"123.60.47.52", 8083, "streaming");
System.out.println(session.submitStatement(CREATE_SOURCE_SQL).getResults());
System.out.println(session.submitStatement(CREATE_SINK_SQL).getResults());
System.out.println(session.submitStatement(CREATE_JOB_SQL).getResults());
session.close();
}
}
server:
bind-address: 172.16.5.230
address: 172.16.5.230
port: 8083
jvm_args: "-Xmx2018m -Xms1024m"
session:
idle-timeout: 1d
check-interval: 1h
max-count: 1000000
tables:
- name: accountpay
type: source-table
update-mode: append
connector:
property-version: 1
type: kafka
version: "0.11"
topic: rds-binlog-test
startup-mode: earliest-offset
properties:
bootstrap.servers: 172.16.7.171:9092
group.id: testGroup
format:
property-version: 1
type: json
schema: "ROW<mysql_type VARCHAR>"
schema:
- name: mysql_type
data-type: VARCHAR
execution:
parallelism: 1
max-parallelism: 16
current-catalog: default_catalog
current-database: default_database
configuration:
table.optimizer.join-reorder-enabled: true
table.exec.spill-compression.enabled: true
table.exec.spill-compression.block-size: 128kb
deployment:
response-timeout: 5000
\ No newline at end of file
package com.miya;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
/**
* Unit test for simple App.
*/
public class AppTest
{
/**
* Rigorous Test :-)
*/
@Test
public void shouldAnswerWithTrue()
{
assertTrue( true );
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment