Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
F
flink-sql-gateway-task
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Registry
Registry
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
xiebaofa
flink-sql-gateway-task
Commits
0f8c85ee
Commit
0f8c85ee
authored
May 19, 2021
by
xiebaofa
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
删除无效代码
parent
1adf9100
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
6 additions
and
65 deletions
+6
-65
AccountPayKafkaBinlogToHBase.java
src/main/java/com/miya/ali/AccountPayKafkaBinlogToHBase.java
+3
-5
ReturnPayKafkaBinlogToHBase.java
src/main/java/com/miya/ali/ReturnPayKafkaBinlogToHBase.java
+3
-6
RestUtils.java
src/main/java/com/miya/common/RestUtils.java
+0
-54
No files found.
src/main/java/com/miya/ali/AccountPayKafkaBinlogToHBase.java
View file @
0f8c85ee
...
@@ -4,7 +4,6 @@ import com.miya.common.SessionClient;
...
@@ -4,7 +4,6 @@ import com.miya.common.SessionClient;
import
java.util.Arrays
;
import
java.util.Arrays
;
import
java.util.List
;
import
java.util.List
;
import
java.util.UUID
;
public
class
AccountPayKafkaBinlogToHBase
{
public
class
AccountPayKafkaBinlogToHBase
{
...
@@ -117,7 +116,6 @@ public class AccountPayKafkaBinlogToHBase {
...
@@ -117,7 +116,6 @@ public class AccountPayKafkaBinlogToHBase {
String
CREATE_JOB_SQL
=
"INSERT INTO huawei_hbase_sink_table\n"
+
String
CREATE_JOB_SQL
=
"INSERT INTO huawei_hbase_sink_table\n"
+
"SELECT "
+
"SELECT "
+
" CONCAT( CAST( (MOD(CAST(DATE_FORMAT(systemdate,'yyyyMMdd') AS INT),10)) AS STRING ) , '~' , DATE_FORMAT(systemdate,'yyyyMMdd'),'~' , out_id) ,"
+
" CONCAT( CAST( (MOD(CAST(DATE_FORMAT(systemdate,'yyyyMMdd') AS INT),10)) AS STRING ) , '~' , DATE_FORMAT(systemdate,'yyyyMMdd'),'~' , out_id) ,"
+
// " '324234234324234' ," +
" ROW("
+
" ROW("
+
"saasid ,\n"
+
"saasid ,\n"
+
"marketid ,\n"
+
"marketid ,\n"
+
...
@@ -168,10 +166,10 @@ public class AccountPayKafkaBinlogToHBase {
...
@@ -168,10 +166,10 @@ public class AccountPayKafkaBinlogToHBase {
for
(
String
rds
:
RDS_LIST
)
{
for
(
String
rds
:
RDS_LIST
)
{
System
.
out
.
println
(
session
.
submitStatement
(
createSourceSql
(
rds
)).
getResults
()
);
session
.
submitStatement
(
createSourceSql
(
rds
)).
getResults
(
);
}
}
System
.
out
.
println
(
session
.
submitStatement
(
createSinkSql
()).
getResults
()
);
session
.
submitStatement
(
createSinkSql
()).
getResults
(
);
System
.
out
.
println
(
session
.
submitStatement
(
createJobSql
()).
getResults
()
);
session
.
submitStatement
(
createJobSql
()).
getResults
(
);
session
.
close
();
session
.
close
();
...
...
src/main/java/com/miya/ali/ReturnPayKafkaBinlogToHBase.java
View file @
0f8c85ee
...
@@ -4,7 +4,6 @@ import com.miya.common.SessionClient;
...
@@ -4,7 +4,6 @@ import com.miya.common.SessionClient;
import
java.util.Arrays
;
import
java.util.Arrays
;
import
java.util.List
;
import
java.util.List
;
import
java.util.UUID
;
public
class
ReturnPayKafkaBinlogToHBase
{
public
class
ReturnPayKafkaBinlogToHBase
{
...
@@ -107,8 +106,6 @@ public class ReturnPayKafkaBinlogToHBase {
...
@@ -107,8 +106,6 @@ public class ReturnPayKafkaBinlogToHBase {
String
CREATE_JOB_SQL
=
"INSERT INTO huawei_hbase_sink_table\n"
+
String
CREATE_JOB_SQL
=
"INSERT INTO huawei_hbase_sink_table\n"
+
"SELECT "
+
"SELECT "
+
" CONCAT( CAST( (MOD(CAST(DATE_FORMAT(miyadate,'yyyyMMdd') AS INT),10)) AS STRING ) , '~' , DATE_FORMAT(miyadate,'yyyyMMdd'),'~' , out_id) ,"
+
" CONCAT( CAST( (MOD(CAST(DATE_FORMAT(miyadate,'yyyyMMdd') AS INT),10)) AS STRING ) , '~' , DATE_FORMAT(miyadate,'yyyyMMdd'),'~' , out_id) ,"
+
// " CONCAT(out_id,'~',CAST( rand() as STRING)) ," +
// " UUID() ," +
" ROW("
+
" ROW("
+
"saasid ,\n"
+
"saasid ,\n"
+
"marketid ,\n"
+
"marketid ,\n"
+
...
@@ -154,10 +151,10 @@ public class ReturnPayKafkaBinlogToHBase {
...
@@ -154,10 +151,10 @@ public class ReturnPayKafkaBinlogToHBase {
for
(
String
rds
:
RDS_LIST
)
{
for
(
String
rds
:
RDS_LIST
)
{
System
.
out
.
println
(
session
.
submitStatement
(
createSourceSql
(
rds
)).
getResults
()
);
session
.
submitStatement
(
createSourceSql
(
rds
)).
getResults
(
);
}
}
System
.
out
.
println
(
session
.
submitStatement
(
createSinkSql
()).
getResults
()
);
session
.
submitStatement
(
createSinkSql
()).
getResults
(
);
System
.
out
.
println
(
session
.
submitStatement
(
createJobSql
()).
getResults
()
);
session
.
submitStatement
(
createJobSql
()).
getResults
(
);
session
.
close
();
session
.
close
();
...
...
src/main/java/com/miya/common/RestUtils.java
deleted
100644 → 0
View file @
1adf9100
package
com
.
miya
.
common
;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import
com.ververica.flink.table.gateway.rest.result.ConstantNames
;
import
com.ververica.flink.table.gateway.rest.result.ResultSet
;
import
org.apache.flink.api.common.JobID
;
import
org.apache.flink.types.Either
;
/**
* Utility class to handle REST data structures.
*/
public
class
RestUtils
{
public
static
JobID
getJobID
(
ResultSet
resultSet
)
{
if
(
resultSet
.
getColumns
().
size
()
!=
1
)
{
throw
new
IllegalArgumentException
(
"Should contain only one column. This is a bug."
);
}
if
(
resultSet
.
getColumns
().
get
(
0
).
getName
().
equals
(
ConstantNames
.
JOB_ID
))
{
String
jobId
=
(
String
)
resultSet
.
getData
().
get
(
0
).
getField
(
0
);
return
JobID
.
fromHexString
(
jobId
);
}
else
{
throw
new
IllegalArgumentException
(
"Column name should be "
+
ConstantNames
.
JOB_ID
+
". This is a bug."
);
}
}
public
static
Either
<
JobID
,
ResultSet
>
getEitherJobIdOrResultSet
(
ResultSet
resultSet
)
{
if
(
resultSet
.
getColumns
().
size
()
==
1
&&
resultSet
.
getColumns
().
get
(
0
).
getName
()
.
equals
(
ConstantNames
.
JOB_ID
))
{
String
jobId
=
(
String
)
resultSet
.
getData
().
get
(
0
).
getField
(
0
);
return
Either
.
Left
(
JobID
.
fromHexString
(
jobId
));
}
else
{
return
Either
.
Right
(
resultSet
);
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment