Commit 7c0050a4 authored by houpengpeng's avatar houpengpeng

米雅odps的udf项目

parents
<?xml version="1.0" encoding="UTF-8"?>
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8">
<output url="file://$MODULE_DIR$/target/classes" />
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/examples" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" />
<sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="jdk" jdkName="1.8" jdkType="JavaSDK" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="scala-sdk-2.11.8" level="application" />
<orderEntry type="library" name="Maven: com.aliyun.odps:odps-sdk-core:0.30.8-public" level="project" />
<orderEntry type="library" name="Maven: com.aliyun.odps:odps-sdk-commons:0.30.8-public" level="project" />
<orderEntry type="library" name="Maven: commons-logging:commons-logging:1.1.1" level="project" />
<orderEntry type="library" name="Maven: org.aspectj:aspectjrt:1.8.2" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.5" level="project" />
<orderEntry type="library" name="Maven: commons-codec:commons-codec:1.9" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.9.13" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-core-asl:1.9.13" level="project" />
<orderEntry type="library" name="Maven: com.google.code.gson:gson:2.2.4" level="project" />
<orderEntry type="library" name="Maven: net.sourceforge.javacsv:javacsv:2.0" level="project" />
<orderEntry type="library" name="Maven: org.xerial.snappy:snappy-java:1.1.1.6" level="project" />
<orderEntry type="library" name="Maven: com.google.protobuf:protobuf-java:2.4.1" level="project" />
<orderEntry type="library" name="Maven: commons-io:commons-io:2.4" level="project" />
<orderEntry type="library" name="Maven: javax.mail:mail:1.4.7" level="project" />
<orderEntry type="library" name="Maven: javax.activation:activation:1.1" level="project" />
<orderEntry type="library" name="Maven: javax.xml.bind:jaxb-api:2.2.7" level="project" />
<orderEntry type="library" name="Maven: com.sun.xml.bind:jaxb-core:2.2.7" level="project" />
<orderEntry type="library" name="Maven: com.sun.istack:istack-commons-runtime:2.16" level="project" />
<orderEntry type="library" name="Maven: com.sun.xml.bind:jaxb-impl:2.2.7" level="project" />
<orderEntry type="library" name="Maven: com.sun.xml.fastinfoset:FastInfoset:1.2.12" level="project" />
<orderEntry type="library" name="Maven: javax.xml.bind:jsr173_api:1.0" level="project" />
<orderEntry type="library" name="Maven: com.aliyun.odps:odps-sdk-udf:0.30.8-public" level="project" />
<orderEntry type="library" name="Maven: com.aliyun.odps:odps-udf-local:0.30.8-public" level="project" />
<orderEntry type="library" name="Maven: com.aliyun.odps:odps-common-local:0.30.8-public" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-compress:1.4" level="project" />
<orderEntry type="library" name="Maven: org.tukaani:xz:1.0" level="project" />
<orderEntry type="library" name="Maven: commons-cli:commons-cli:1.3.1" level="project" />
<orderEntry type="library" name="Maven: com.aliyun.odps:odps-udf-example:0.30.8-public" level="project" />
<orderEntry type="library" name="Maven: commons-lang:commons-lang:2.6" level="project" />
<orderEntry type="library" name="Maven: com.aliyun.odps:odps-sdk-mapred:0.30.8-public" level="project" />
<orderEntry type="library" name="Maven: com.aliyun.odps:odps-mapred-local:0.30.8-public" level="project" />
<orderEntry type="library" name="Maven: com.aliyun.odps:odps-mapred-bridge:0.30.8-public" level="project" />
<orderEntry type="library" name="Maven: com.aliyun.odps:odps-sdk-lot:0.30.8-public" level="project" />
<orderEntry type="library" name="Maven: com.aliyun.odps:odps-lot-proto:0.30.8-public" level="project" />
<orderEntry type="library" name="Maven: org.apache.velocity:velocity:1.7" level="project" />
<orderEntry type="library" name="Maven: commons-collections:commons-collections:3.2.1" level="project" />
<orderEntry type="library" name="Maven: com.aliyun.odps:odps-sdk-graph:0.30.8-public" level="project" />
<orderEntry type="library" name="Maven: com.google.guava:guava:15.0" level="project" />
<orderEntry type="library" name="Maven: com.aliyun.odps:odps-graph-local:0.30.8-public" level="project" />
<orderEntry type="library" name="Maven: junit:junit:4.12" level="project" />
<orderEntry type="library" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
<orderEntry type="library" name="Maven: com.alibaba:fastjson:1.2.36" level="project" />
<orderEntry type="library" name="Maven: org.json:json:20180130" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang:scala-library:2.11.12" level="project" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aliyun.odps.myJava</groupId>
<artifactId>AprilModule</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>com.alibaba</pattern>
<shadedPattern>shaded.com.alibaba</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<properties>
<sdk.version>0.30.8-public</sdk.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
</project>
package com.aliyun.odps.examples;
import com.aliyun.odps.Odps;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.local.common.WareHouse;
import java.io.File;
public class TestUtil {
private final static String accessId = "accessId";
private final static String accessKey = "accessKey";
private final static String endpoint = "endpoint";
private final static String defaultProject = "example_project";
static Odps odps;
static {
Account account = new AliyunAccount(accessId, accessKey);
odps = new Odps(account);
odps.setEndpoint(endpoint);
odps.setDefaultProject(defaultProject);
}
public static String join(Object[] obj) {
if (obj == null) {
return null;
}
StringBuffer sb = new StringBuffer();
for (int i = 0; i < obj.length; i++) {
if (sb.length() > 0) {
sb.append(",");
}
sb.append(obj[i]);
}
return sb.toString();
}
public static Odps getOdps() {
return odps;
}
public static WareHouse initWarehouse() {
//init the warehouse in project dir
File exampleProjectDir = new File("warehouse" + File.separator + defaultProject);
if (exampleProjectDir.exists()) {
return WareHouse.getInstance("warehouse");
} else {
exampleProjectDir = new File("../warehouse" + File.separator + defaultProject);
if (exampleProjectDir.exists()) {
return WareHouse.getInstance("../warehouse");
}
}
throw new RuntimeException("warehouse dir not exists");
}
}
\ No newline at end of file
package com.aliyun.odps.examples.graph;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.graph.Aggregator;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Tuple;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.io.WritableRecord;
/**
* Set resources arguments:
* kmeans_centers
* Set program arguments:
* kmeans_in kmeans_out
*/
public class Kmeans {
private final static Log LOG = LogFactory.getLog(Kmeans.class);
public static class KmeansVertex extends Vertex<Text, Tuple, NullWritable, NullWritable> {
@Override
public void compute(ComputeContext<Text, Tuple, NullWritable, NullWritable> context,
Iterable<NullWritable> messages) throws IOException {
context.aggregate(getValue());
}
}
public static class KmeansVertexReader extends
GraphLoader<Text, Tuple, NullWritable, NullWritable> {
@Override
public void load(LongWritable recordNum, WritableRecord record,
MutationContext<Text, Tuple, NullWritable, NullWritable> context) throws IOException {
KmeansVertex vertex = new KmeansVertex();
vertex.setId(new Text(String.valueOf(recordNum.get())));
vertex.setValue(new Tuple(record.getAll()));
context.addVertexRequest(vertex);
}
}
public static class KmeansAggrValue implements Writable {
Tuple centers = new Tuple();
Tuple sums = new Tuple();
Tuple counts = new Tuple();
public void write(DataOutput out) throws IOException {
centers.write(out);
sums.write(out);
counts.write(out);
}
public void readFields(DataInput in) throws IOException {
centers = new Tuple();
centers.readFields(in);
sums = new Tuple();
sums.readFields(in);
counts = new Tuple();
counts.readFields(in);
}
@Override
public String toString() {
return "centers " + centers.toString() + ", sums " + sums.toString() + ", counts "
+ counts.toString();
}
}
public static class KmeansAggregator extends Aggregator<KmeansAggrValue> {
@SuppressWarnings("rawtypes")
@Override
public KmeansAggrValue createInitialValue(WorkerContext context) throws IOException {
KmeansAggrValue aggrVal = null;
if (context.getSuperstep() == 0) {
aggrVal = new KmeansAggrValue();
aggrVal.centers = new Tuple();
aggrVal.sums = new Tuple();
aggrVal.counts = new Tuple();
byte[] centers = context.readCacheFile("kmeans_centers");
String lines[] = new String(centers).split("\n");
for (int i = 0; i < lines.length; i++) {
String[] ss = lines[i].split(",");
Tuple center = new Tuple();
Tuple sum = new Tuple();
for (int j = 0; j < ss.length; ++j) {
center.append(new DoubleWritable(Double.valueOf(ss[j].trim())));
sum.append(new DoubleWritable(0.0));
}
LongWritable count = new LongWritable(0);
aggrVal.sums.append(sum);
aggrVal.counts.append(count);
aggrVal.centers.append(center);
}
} else {
aggrVal = (KmeansAggrValue) context.getLastAggregatedValue(0);
}
return aggrVal;
}
@Override
public void aggregate(KmeansAggrValue value, Object item) {
int min = 0;
double mindist = Double.MAX_VALUE;
Tuple point = (Tuple) item;
for (int i = 0; i < value.centers.size(); i++) {
Tuple center = (Tuple) value.centers.get(i);
// use Euclidean Distance, no need to calculate sqrt
double dist = 0.0d;
for (int j = 0; j < center.size(); j++) {
double v = ((DoubleWritable) point.get(j)).get() - ((DoubleWritable) center.get(j)).get();
dist += v * v;
}
if (dist < mindist) {
mindist = dist;
min = i;
}
}
// update sum and count
Tuple sum = (Tuple) value.sums.get(min);
for (int i = 0; i < point.size(); i++) {
DoubleWritable s = (DoubleWritable) sum.get(i);
s.set(s.get() + ((DoubleWritable) point.get(i)).get());
}
LongWritable count = (LongWritable) value.counts.get(min);
count.set(count.get() + 1);
}
@Override
public void merge(KmeansAggrValue value, KmeansAggrValue partial) {
for (int i = 0; i < value.sums.size(); i++) {
Tuple sum = (Tuple) value.sums.get(i);
Tuple that = (Tuple) partial.sums.get(i);
for (int j = 0; j < sum.size(); j++) {
DoubleWritable s = (DoubleWritable) sum.get(j);
s.set(s.get() + ((DoubleWritable) that.get(j)).get());
}
}
for (int i = 0; i < value.counts.size(); i++) {
LongWritable count = (LongWritable) value.counts.get(i);
count.set(count.get() + ((LongWritable) partial.counts.get(i)).get());
}
}
@SuppressWarnings("rawtypes")
@Override
public boolean terminate(WorkerContext context, KmeansAggrValue value) throws IOException {
// compute new centers
Tuple newCenters = new Tuple(value.sums.size());
for (int i = 0; i < value.sums.size(); i++) {
Tuple sum = (Tuple) value.sums.get(i);
Tuple newCenter = new Tuple(sum.size());
LongWritable c = (LongWritable) value.counts.get(i);
for (int j = 0; j < sum.size(); j++) {
DoubleWritable s = (DoubleWritable) sum.get(j);
double val = s.get() / c.get();
newCenter.set(j, new DoubleWritable(val));
// reset sum for next iteration
s.set(0.0d);
}
// reset count for next iteration
c.set(0);
newCenters.set(i, newCenter);
}
// update centers
Tuple oldCenters = value.centers;
value.centers = newCenters;
LOG.info("old centers: " + oldCenters + ", new centers: " + newCenters);
// compare new/old centers
boolean converged = true;
for (int i = 0; i < value.centers.size() && converged; i++) {
Tuple oldCenter = (Tuple) oldCenters.get(i);
Tuple newCenter = (Tuple) newCenters.get(i);
double sum = 0.0d;
for (int j = 0; j < newCenter.size(); j++) {
double v =
((DoubleWritable) newCenter.get(j)).get() - ((DoubleWritable) oldCenter.get(j)).get();
sum += v * v;
}
double dist = Math.sqrt(sum);
LOG.info("old center: " + oldCenter + ", new center: " + newCenter + ", dist: " + dist);
// converge threshold for each center: 0.05
converged = dist < 0.05d;
}
if (converged || context.getSuperstep() == context.getMaxIteration() - 1) {
// converged or reach max iteration, output centers
for (int i = 0; i < value.centers.size(); i++) {
context.write(((Tuple) value.centers.get(i)).toArray());
}
// true means to terminate iteration
return true;
}
// false means to continue iteration
return false;
}
}
private static void printUsage() {
System.out.println("Usage: <in> <out> [Max iterations (default 30)]");
System.exit(-1);
}
public static void main(String[] args) throws IOException {
if (args.length < 2)
printUsage();
GraphJob job = new GraphJob();
job.setGraphLoaderClass(KmeansVertexReader.class);
job.setRuntimePartitioning(false);
job.setVertexClass(KmeansVertex.class);
job.setAggregatorClass(KmeansAggregator.class);
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
// default max iteration is 30
job.setMaxIteration(30);
if (args.length >= 3)
job.setMaxIteration(Integer.parseInt(args[2]));
long start = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in " + (System.currentTimeMillis() - start) / 1000.0
+ " seconds");
}
}
package com.aliyun.odps.examples.graph;
import java.io.IOException;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.io.WritableRecord;
/**
* Set program arguments:
* pagerank_in pagerank_out
*
*/
public class PageRank {
public static class PageRankVertex extends
Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> {
@Override
public void compute(ComputeContext<Text, DoubleWritable, NullWritable, DoubleWritable> context,
Iterable<DoubleWritable> messages) throws IOException {
if (context.getSuperstep() == 0) {
setValue(new DoubleWritable(1.0 / context.getTotalNumVertices()));
} else if (context.getSuperstep() >= 1) {
double sum = 0;
for (DoubleWritable msg : messages) {
sum += msg.get();
}
DoubleWritable vertexValue =
new DoubleWritable((0.15f / context.getTotalNumVertices()) + 0.85f * sum);
setValue(vertexValue);
}
if (hasEdges()) {
context.sendMessageToNeighbors(this, new DoubleWritable(getValue().get()
/ getEdges().size()));
}
}
@Override
public void cleanup(WorkerContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
throws IOException {
context.write(getId(), getValue());
}
}
public static class PageRankVertexReader extends
GraphLoader<Text, DoubleWritable, NullWritable, DoubleWritable> {
@Override
public void load(LongWritable recordNum, WritableRecord record,
MutationContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
throws IOException {
PageRankVertex vertex = new PageRankVertex();
vertex.setValue(new DoubleWritable(0));
vertex.setId((Text) record.get(0));
System.out.println(record.get(0));
for (int i = 1; i < record.size(); i++) {
Writable edge = record.get(i);
System.out.println(edge.toString());
if (!(edge.equals(NullWritable.get()))) {
vertex.addEdge(new Text(edge.toString()), NullWritable.get());
}
}
System.out.println("vertex edgs size: " + (vertex.hasEdges() ? vertex.getEdges().size() : 0));
context.addVertexRequest(vertex);
}
}
private static void printUsage() {
System.out.println("Usage: <in> <out> [Max iterations (default 30)]");
System.exit(-1);
}
public static void main(String[] args) throws IOException {
if (args.length < 2)
printUsage();
GraphJob job = new GraphJob();
job.setGraphLoaderClass(PageRankVertexReader.class);
job.setVertexClass(PageRankVertex.class);
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
// default max iteration is 30
job.setMaxIteration(30);
if (args.length >= 3)
job.setMaxIteration(Integer.parseInt(args[2]));
long startTime = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0
+ " seconds");
}
}
package com.aliyun.odps.examples.graph;
import java.io.IOException;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.Combiner;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.Edge;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.data.TableInfo;
/**
* Set program arguments:
* 1 sssp_in sssp_out
*
*/
public class SSSP {
public static final String START_VERTEX = "sssp.start.vertex.id";
public static class SSSPVertex extends
Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
private static long startVertexId = -1;
public SSSPVertex() {
this.setValue(new LongWritable(Long.MAX_VALUE));
}
public boolean isStartVertex(
ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) {
if (startVertexId == -1) {
String s = context.getConfiguration().get(START_VERTEX);
startVertexId = Long.parseLong(s);
}
return getId().get() == startVertexId;
}
@Override
public void compute(
ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
Iterable<LongWritable> messages) throws IOException {
long minDist = isStartVertex(context) ? 0 : Integer.MAX_VALUE;
for (LongWritable msg : messages) {
if (msg.get() < minDist) {
minDist = msg.get();
}
}
if (minDist < this.getValue().get()) {
this.setValue(new LongWritable(minDist));
if (hasEdges()) {
for (Edge<LongWritable, LongWritable> e : this.getEdges()) {
context
.sendMessage(e.getDestVertexId(), new LongWritable(minDist + e.getValue().get()));
}
}
} else {
voteToHalt();
}
}
@Override
public void cleanup(
WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
throws IOException {
context.write(getId(), getValue());
}
}
public static class MinLongCombiner extends Combiner<LongWritable, LongWritable> {
@Override
public void combine(LongWritable vertexId, LongWritable combinedMessage,
LongWritable messageToCombine) throws IOException {
if (combinedMessage.get() > messageToCombine.get()) {
combinedMessage.set(messageToCombine.get());
}
}
}
public static class SSSPVertexReader extends
GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
@Override
public void load(LongWritable recordNum, WritableRecord record,
MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
throws IOException {
SSSPVertex vertex = new SSSPVertex();
vertex.setId((LongWritable) record.get(0));
String[] edges = record.get(1).toString().split(";");
for (int i = 0; i < edges.length; i++) {
String[] ss = edges[i].split(":");
vertex.addEdge(new LongWritable(Long.parseLong(ss[0])),
new LongWritable(Long.parseLong(ss[1])));
}
context.addVertexRequest(vertex);
}
}
public static void main(String[] args) throws IOException {
if (args.length < 2) {
System.out.println("Usage: <startnode> <input> <output>");
System.exit(-1);
}
GraphJob job = new GraphJob();
job.setGraphLoaderClass(SSSPVertexReader.class);
job.setVertexClass(SSSPVertex.class);
job.setCombinerClass(MinLongCombiner.class);
job.set(START_VERTEX, args[0]);
job.addInput(TableInfo.builder().tableName(args[1]).build());
job.addOutput(TableInfo.builder().tableName(args[2]).build());
long startTime = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0
+ " seconds");
}
}
package com.aliyun.odps.examples.mr;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Iterator;
/*
* 该示例展示了如何在MapReduce程序中读取文件资源
* 该示例主要用于演示Local模式下的调试,如果要将该示例运行于在线环境,
* 请将 main方法中的语句 "job.setResources("file_resource.txt");" 删除
*
* Usage:
* Set Resource arguments:
* file_resource.txt
* Set program arguments:
* wc_in1 rs_out
*/
public class Resource {
public static class TokenizerMapper extends MapperBase {
Record result;
@Override
public void setup(TaskContext context) throws IOException {
result = context.createOutputRecord();
long fileResourceLineCount = 0;
InputStream in = context.readResourceFileAsStream("file_resource.txt");
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String line;
while ((line = br.readLine()) != null) {
fileResourceLineCount++;
}
br.close();
result.set(0, "file_resource_line_count");
result.set(1, fileResourceLineCount);
context.write(result);
br.close();
Iterator<Record> it = context.readResourceTable("table_resource1");
long tableResourceRecordCount = 0;
while (it.hasNext()) {
Record r = it.next();
++tableResourceRecordCount;
}
result.set(0, "table_resource1_record_count");
result.set(1, tableResourceRecordCount);
context.write(result);
it = context.readResourceTable("table_resource2");
tableResourceRecordCount = 0;
while (it.hasNext()) {
Record r = it.next();
++tableResourceRecordCount;
}
result.set(0, "table_resource2_record_count");
result.set(1, tableResourceRecordCount);
context.write(result);
}
}
public static void main(String[] args) throws Exception {
JobConf job = new JobConf();
job.setMapperClass(TokenizerMapper.class);
job.setNumReduceTasks(0);
InputUtils.addTable(TableInfo.builder().tableName("wc_in1").build(), job);
OutputUtils.addTable(TableInfo.builder().tableName("rs_out").build(), job);
JobClient.runJob(job);
}
}
package com.aliyun.odps.examples.mr;
import com.aliyun.odps.counter.Counter;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
import java.io.IOException;
import java.util.Iterator;
/*
* 该示例展示了MapReduce程序中的基本结构
*/
public class WordCount {
public static class TokenizerMapper extends MapperBase {
Record word;
Record one;
Counter gCnt;
@Override
public void setup(TaskContext context) throws IOException {
word = context.createMapOutputKeyRecord();
one = context.createMapOutputValueRecord();
one.set(new Object[] {1L});
gCnt = context.getCounter("MyCounters", "global_counts");
}
@Override
public void map(long recordNum, Record record, TaskContext context) throws IOException {
for (int i = 0; i < record.getColumnCount(); i++) {
String[] words = record.get(i).toString().split("\\s+");
for (String w : words) {
word.set(new Object[] {w});
Counter cnt = context.getCounter("MyCounters", "map_outputs");
cnt.increment(1);
gCnt.increment(1);
context.write(word, one);
}
}
}
}
/**
* A combiner class that combines map output by sum them.
*/
public static class SumCombiner extends ReducerBase {
private Record count;
@Override
public void setup(TaskContext context) throws IOException {
count = context.createMapOutputValueRecord();
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
long c = 0;
while (values.hasNext()) {
Record val = values.next();
c += (Long) val.get(0);
}
count.set(0, c);
context.write(key, count);
}
}
/**
* A reducer class that just emits the sum of the input values.
*/
public static class SumReducer extends ReducerBase {
private Record result;
Counter gCnt;
@Override
public void setup(TaskContext context) throws IOException {
result = context.createOutputRecord();
gCnt = context.getCounter("MyCounters", "global_counts");
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
long count = 0;
while (values.hasNext()) {
Record val = values.next();
count += (Long) val.get(0);
}
result.set(0, key.get(0));
result.set(1, count);
Counter cnt = context.getCounter("MyCounters", "reduce_outputs");
cnt.increment(1);
gCnt.increment(1);
context.write(result);
}
}
public static void main(String[] args) throws Exception {
JobConf job = new JobConf();
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(SumCombiner.class);
job.setReducerClass(SumReducer.class);
job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
InputUtils.addTable(TableInfo.builder().tableName("wc_in1").cols(new String[] {"col2", "col3"})
.build(), job);
InputUtils.addTable(TableInfo.builder().tableName("wc_in2").partSpec("p1=2/p2=1").build(), job);
OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(), job);
RunningJob rj = JobClient.runJob(job);
rj.waitForCompletion();
}
}
package com.aliyun.odps.examples.mr.test;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.examples.TestUtil;
import com.aliyun.odps.examples.mr.WordCount;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.unittest.*;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
import junit.framework.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
public class WordCountTest extends MRUnitTest {
// 定义输入输出表的 schema
private final static String INPUT_SCHEMA = "a:string,b:string";
private final static String OUTPUT_SCHEMA = "k:string,v:bigint";
private JobConf job;
public WordCountTest() throws Exception {
TestUtil.initWarehouse();
// 准备作业配置
job = new JobConf();
job.setMapperClass(WordCount.TokenizerMapper.class);
job.setCombinerClass(WordCount.SumCombiner.class);
job.setReducerClass(WordCount.SumReducer.class);
job.setMapOutputKeySchema(SchemaUtils.fromString("key:string"));
job.setMapOutputValueSchema(SchemaUtils.fromString("value:bigint"));
InputUtils.addTable(TableInfo.builder().tableName("wc_in").build(), job);
OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(), job);
}
@SuppressWarnings("deprecation")
@Test
public void testMap() throws IOException, ClassNotFoundException, InterruptedException {
MapUTContext mapContext = new MapUTContext();
mapContext.setInputSchema(INPUT_SCHEMA);
mapContext.setOutputSchema(OUTPUT_SCHEMA, job);
// 准备测试数据
Record record = mapContext.createInputRecord();
record.set(new Text[] {new Text("hello"), new Text("c")});
mapContext.addInputRecord(record);
record = mapContext.createInputRecord();
record.set(new Text[] {new Text("hello"), new Text("java")});
mapContext.addInputRecord(record);
// 运行 map 过程
TaskOutput output = runMapper(job, mapContext);
// 验证 map 的结果(执行了combine),为 3 组 key/value 对
List<KeyValue<Record, Record>> kvs = output.getOutputKeyValues();
Assert.assertEquals(3, kvs.size());
Assert.assertEquals(new KeyValue<String, Long>(new String("c"), new Long(1)),
new KeyValue<String, Long>((String) (kvs.get(0).getKey().get(0)), (Long) (kvs.get(0)
.getValue().get(0))));
Assert.assertEquals(new KeyValue<String, Long>(new String("hello"), new Long(2)),
new KeyValue<String, Long>((String) (kvs.get(1).getKey().get(0)), (Long) (kvs.get(1)
.getValue().get(0))));
Assert.assertEquals(new KeyValue<String, Long>(new String("java"), new Long(1)),
new KeyValue<String, Long>((String) (kvs.get(2).getKey().get(0)), (Long) (kvs.get(2)
.getValue().get(0))));
}
@Test
public void testReduce() throws IOException, ClassNotFoundException, InterruptedException {
ReduceUTContext context = new ReduceUTContext();
context.setOutputSchema(OUTPUT_SCHEMA, job);
// 准备测试数据
Record key = context.createInputKeyRecord(job);
Record value = context.createInputValueRecord(job);
key.set(0, "world");
value.set(0, new Long(1));
context.addInputKeyValue(key, value);
key.set(0, "hello");
value.set(0, new Long(1));
context.addInputKeyValue(key, value);
key.set(0, "hello");
value.set(0, new Long(1));
context.addInputKeyValue(key, value);
key.set(0, "odps");
value.set(0, new Long(1));
context.addInputKeyValue(key, value);
// 运行 reduce 过程
TaskOutput output = runReducer(job, context);
// 验证 reduce 结果,为 3 条 record
List<Record> records = output.getOutputRecords();
Assert.assertEquals(3, records.size());
Assert.assertEquals(new String("hello"), records.get(0).get("k"));
Assert.assertEquals(new Long(2), records.get(0).get("v"));
Assert.assertEquals(new String("odps"), records.get(1).get("k"));
Assert.assertEquals(new Long(1), records.get(1).get("v"));
Assert.assertEquals(new String("world"), records.get(2).get("k"));
Assert.assertEquals(new Long(1), records.get(2).get("v"));
}
}
\ No newline at end of file
package com.aliyun.odps.examples.udf;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.annotation.Resolve;
/**
* project: example_project
* table: wc_in2
* partitions: p2=1,p1=2
* columns: colc,colb,cola
*/
@Resolve("string->bigint")
public class UDAFExample extends Aggregator {
@Override
public void iterate(Writable arg0, Writable[] arg1) throws UDFException {
LongWritable result = (LongWritable) arg0;
for (Writable item : arg1) {
Text txt = (Text) item;
result.set(result.get() + txt.getLength());
}
}
@Override
public void merge(Writable arg0, Writable arg1) throws UDFException {
LongWritable result = (LongWritable) arg0;
LongWritable partial = (LongWritable) arg1;
result.set(result.get() + partial.get());
}
@Override
public Writable newBuffer() {
return new LongWritable(0L);
}
@Override
public Writable terminate(Writable arg0) throws UDFException {
return arg0;
}
}
package com.aliyun.odps.examples.udf;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.annotation.Resolve;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Iterator;
/**
* project: example_project
* table: wc_in2
* partitions: p2=1,p1=2
* columns: colc,colb,cola
*/
@Resolve("string->bigint")
public class UDAFResource extends Aggregator {
ExecutionContext ctx;
long fileResourceLineCount;
long tableResource1RecordCount;
long tableResource2RecordCount;
@Override
public void setup(ExecutionContext ctx) throws UDFException {
this.ctx = ctx;
try {
InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
BufferedReader br = new BufferedReader(new InputStreamReader(in));
fileResourceLineCount = 0;
String line;
while ((line = br.readLine()) != null) {
fileResourceLineCount++;
}
br.close();
Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator();
tableResource1RecordCount = 0;
while (iterator.hasNext()) {
tableResource1RecordCount++;
iterator.next();
}
iterator = ctx.readResourceTable("table_resource2").iterator();
tableResource2RecordCount = 0;
while (iterator.hasNext()) {
tableResource2RecordCount++;
iterator.next();
}
} catch (IOException e) {
throw new UDFException(e);
}
}
@Override
public void iterate(Writable arg0, Writable[] arg1) throws UDFException {
LongWritable result = (LongWritable) arg0;
for (Writable item : arg1) {
Text txt = (Text) item;
result.set(result.get() + txt.getLength());
}
}
@Override
public void merge(Writable arg0, Writable arg1) throws UDFException {
LongWritable result = (LongWritable) arg0;
LongWritable partial = (LongWritable) arg1;
result.set(result.get() + partial.get());
}
@Override
public Writable newBuffer() {
return new LongWritable(0L);
}
@Override
public Writable terminate(Writable arg0) throws UDFException {
LongWritable result = (LongWritable) arg0;
result.set(result.get() + fileResourceLineCount + tableResource1RecordCount
+ tableResource2RecordCount);
return result;
}
}
package com.aliyun.odps.examples.udf;
import com.aliyun.odps.udf.UDF;
public class UDFExample extends UDF {
/**
* project: example_project
* table: wc_in1
* columns: col1
*/
public String evaluate(String a) {
return "s2s:" + a;
}
/**
* project: example_project
* table: wc_in1
* columns: col1,col2
*/
public String evaluate(String a, String b) {
return "ss2s:" + a + "," + b;
}
/**
* project: example_project
* table: wc_in2
* partitions: p2=1,p1=2
* columns: colc,colb,cola
*/
public String evaluate(String a, String b, String c) {
return "sss2s:" + a + "," + b + "," + c;
}
}
package com.aliyun.odps.examples.udf;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.UDFException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Iterator;
public class UDFResource extends UDF {
ExecutionContext ctx;
long fileResourceLineCount;
long tableResource1RecordCount;
long tableResource2RecordCount;
@Override
public void setup(ExecutionContext ctx) throws UDFException {
this.ctx = ctx;
try {
InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String line;
fileResourceLineCount = 0;
while ((line = br.readLine()) != null) {
fileResourceLineCount++;
}
br.close();
Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator();
tableResource1RecordCount = 0;
while (iterator.hasNext()) {
tableResource1RecordCount++;
iterator.next();
}
iterator = ctx.readResourceTable("table_resource2").iterator();
tableResource2RecordCount = 0;
while (iterator.hasNext()) {
tableResource2RecordCount++;
iterator.next();
}
} catch (IOException e) {
throw new UDFException(e);
}
}
/**
* project: example_project table: wc_in2 partitions: p2=1,p1=2 columns: colc,colb
*/
public String evaluate(String a, String b) {
return "ss2s:" + a + "," + b + "|fileResourceLineCount=" + fileResourceLineCount
+ "|tableResource1RecordCount=" + tableResource1RecordCount + "|tableResource2RecordCount="
+ tableResource2RecordCount;
}
}
package com.aliyun.odps.examples.udf;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.annotation.Resolve;
/**
* project: example_project
* table: wc_in2
* partitions: p2=1,p1=2
* columns: colc,colb
*/
@Resolve({"string,string->string,bigint"})
public class UDTFExample extends UDTF {
@Override
public void process(Object[] args) throws UDFException {
String a = (String) args[0];
long b = args[1] == null ? 0 : ((String) args[1]).length();
forward(a, b);
}
}
package com.aliyun.odps.examples.udf;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.annotation.Resolve;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Iterator;
/**
* project: example_project
* table: wc_in2
* partitions: p2=1,p1=2
* columns: colc,colb
*/
@Resolve({"string,string->string,bigint,string"})
public class UDTFResource extends UDTF {
ExecutionContext ctx;
long fileResourceLineCount;
long tableResource1RecordCount;
long tableResource2RecordCount;
@Override
public void setup(ExecutionContext ctx) throws UDFException {
this.ctx = ctx;
try {
InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String line;
fileResourceLineCount = 0;
while ((line = br.readLine()) != null) {
fileResourceLineCount++;
}
br.close();
Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator();
tableResource1RecordCount = 0;
while (iterator.hasNext()) {
tableResource1RecordCount++;
iterator.next();
}
iterator = ctx.readResourceTable("table_resource2").iterator();
tableResource2RecordCount = 0;
while (iterator.hasNext()) {
tableResource2RecordCount++;
iterator.next();
}
} catch (IOException e) {
throw new UDFException(e);
}
}
@Override
public void process(Object[] args) throws UDFException {
String a = (String) args[0];
long b = args[1] == null ? 0 : ((String) args[1]).length();
forward(a, b, "fileResourceLineCount=" + fileResourceLineCount + "|tableResource1RecordCount="
+ tableResource1RecordCount + "|tableResource2RecordCount=" + tableResource2RecordCount);
}
}
package com.aliyun.odps.examples.udf.test;
import com.aliyun.odps.examples.TestUtil;
import com.aliyun.odps.udf.local.datasource.InputSource;
import com.aliyun.odps.udf.local.datasource.TableInputSource;
import com.aliyun.odps.udf.local.runner.AggregatorRunner;
import com.aliyun.odps.udf.local.runner.BaseRunner;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.List;
public class UDAFTest {
@BeforeClass
public static void initWarehouse() {
TestUtil.initWarehouse();
}
@Test
public void simpleInput() throws Exception{
BaseRunner runner = new AggregatorRunner(null,
"com.aliyun.odps.examples.udf.UDAFExample");
runner.feed(new Object[] { "one", "one" }).feed(new Object[] { "three", "three" })
.feed(new Object[] { "four", "four" });
List<Object[]> out = runner.yield();
Assert.assertEquals(1, out.size());
Assert.assertEquals(24L, out.get(0)[0]);
}
@Test
public void inputFromTable() throws Exception{
BaseRunner runner = new AggregatorRunner(TestUtil.getOdps(),
"com.aliyun.odps.examples.udf.UDAFExample");
// partition table
String project = "example_project";
String table = "wc_in2";
String[] partitions = new String[] { "p2=1", "p1=2" };
String[] columns = new String[] { "colc", "cola" };
InputSource inputSource = new TableInputSource(project, table, partitions, columns);
Object[] data;
while ((data = inputSource.getNextRow()) != null) {
runner.feed(data);
}
List<Object[]> out = runner.yield();
Assert.assertEquals(1, out.size());
Assert.assertEquals(36L, out.get(0)[0]);
}
@Test
public void resourceTest() throws Exception{
BaseRunner runner = new AggregatorRunner(TestUtil.getOdps(),
"com.aliyun.odps.examples.udf.UDAFResource");
runner.feed(new Object[] { "one", "one" }).feed(new Object[] { "three", "three" })
.feed(new Object[] { "four", "four" });
List<Object[]> out = runner.yield();
Assert.assertEquals(1, out.size());
// 24+3+4+4
Assert.assertEquals(35L, out.get(0)[0]);
}
}
\ No newline at end of file
package com.aliyun.odps.examples.udf.test;
import com.aliyun.odps.examples.TestUtil;
import com.aliyun.odps.udf.local.datasource.InputSource;
import com.aliyun.odps.udf.local.datasource.TableInputSource;
import com.aliyun.odps.udf.local.runner.BaseRunner;
import com.aliyun.odps.udf.local.runner.UDFRunner;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.List;
public class UDFTest {
@BeforeClass
public static void initWarehouse() {
TestUtil.initWarehouse();
}
@Test
public void simpleInput() throws Exception{
BaseRunner runner = new UDFRunner(null, "com.aliyun.odps.examples.udf.UDFExample");
runner.feed(new Object[] { "one", "one" }).feed(new Object[] { "three", "three" })
.feed(new Object[] { "four", "four" });
List<Object[]> out = runner.yield();
Assert.assertEquals(3, out.size());
Assert.assertEquals("ss2s:one,one", TestUtil.join(out.get(0)));
Assert.assertEquals("ss2s:three,three", TestUtil.join(out.get(1)));
Assert.assertEquals("ss2s:four,four", TestUtil.join(out.get(2)));
}
@Test
public void inputFromTable() throws Exception{
BaseRunner runner = new UDFRunner(TestUtil.getOdps(), "com.aliyun.odps.examples.udf.UDFExample");
String project = "example_project";
String table = "wc_in2";
String[] partitions = new String[] { "p2=1", "p1=2" };
String[] columns = new String[] { "colc", "cola" };
InputSource inputSource = new TableInputSource(project, table, partitions, columns);
Object[] data;
while ((data = inputSource.getNextRow()) != null) {
runner.feed(data);
}
List<Object[]> out = runner.yield();
Assert.assertEquals(3, out.size());
Assert.assertEquals("ss2s:three3,three1", TestUtil.join(out.get(0)));
Assert.assertEquals("ss2s:three3,three1", TestUtil.join(out.get(1)));
Assert.assertEquals("ss2s:three3,three1", TestUtil.join(out.get(2)));
}
@Test
public void resourceTest() throws Exception{
BaseRunner runner = new UDFRunner(TestUtil.getOdps(), "com.aliyun.odps.examples.udf.UDFResource");
runner.feed(new Object[] { "one", "one" }).feed(new Object[] { "three", "three" })
.feed(new Object[] { "four", "four" });
List<Object[]> out = runner.yield();
Assert.assertEquals(3, out.size());
Assert.assertEquals("ss2s:one,one|fileResourceLineCount=3|tableResource1RecordCount=4|tableResource2RecordCount=4",
TestUtil.join(out.get(0)));
Assert.assertEquals("ss2s:three,three|fileResourceLineCount=3|tableResource1RecordCount=4|tableResource2RecordCount=4",
TestUtil.join(out.get(1)));
Assert.assertEquals("ss2s:four,four|fileResourceLineCount=3|tableResource1RecordCount=4|tableResource2RecordCount=4",
TestUtil.join(out.get(2)));
}
}
\ No newline at end of file
package com.aliyun.odps.examples.udf.test;
import com.aliyun.odps.examples.TestUtil;
import com.aliyun.odps.udf.local.datasource.InputSource;
import com.aliyun.odps.udf.local.datasource.TableInputSource;
import com.aliyun.odps.udf.local.runner.BaseRunner;
import com.aliyun.odps.udf.local.runner.UDTFRunner;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.List;
public class UDTFTest {
@BeforeClass
public static void initWarehouse() {
TestUtil.initWarehouse();
}
@Test
public void simpleInput() throws Exception{
BaseRunner runner = new UDTFRunner(null, "com.aliyun.odps.examples.udf.UDTFExample");
runner.feed(new Object[] { "one", "one" }).feed(new Object[] { "three", "three" })
.feed(new Object[] { "four", "four" });
List<Object[]> out = runner.yield();
Assert.assertEquals(3, out.size());
Assert.assertEquals("one,3", TestUtil.join(out.get(0)));
Assert.assertEquals("three,5", TestUtil.join(out.get(1)));
Assert.assertEquals("four,4", TestUtil.join(out.get(2)));
}
@Test
public void inputFromTable() throws Exception{
BaseRunner runner = new UDTFRunner(TestUtil.getOdps(), "com.aliyun.odps.examples.udf.UDTFExample");
String project = "example_project";
String table = "wc_in2";
String[] partitions = new String[] { "p2=1", "p1=2" };
String[] columns = new String[] { "colc", "cola" };
InputSource inputSource = new TableInputSource(project, table, partitions, columns);
Object[] data;
while ((data = inputSource.getNextRow()) != null) {
runner.feed(data);
}
List<Object[]> out = runner.yield();
Assert.assertEquals(3, out.size());
Assert.assertEquals("three3,6", TestUtil.join(out.get(0)));
Assert.assertEquals("three3,6", TestUtil.join(out.get(1)));
Assert.assertEquals("three3,6", TestUtil.join(out.get(2)));
}
@Test
public void resourceTest() throws Exception{
BaseRunner runner = new UDTFRunner(TestUtil.getOdps(), "com.aliyun.odps.examples.udf.UDTFResource");
runner.feed(new Object[] { "one", "one" }).feed(new Object[] { "three", "three" })
.feed(new Object[] { "four", "four" });
List<Object[]> out = runner.yield();
Assert.assertEquals(3 + "", out.size() + "");
Assert.assertEquals("one,3,fileResourceLineCount=3|tableResource1RecordCount=4|tableResource2RecordCount=4",
TestUtil.join(out.get(0)));
Assert.assertEquals("three,5,fileResourceLineCount=3|tableResource1RecordCount=4|tableResource2RecordCount=4",
TestUtil.join(out.get(1)));
Assert.assertEquals("four,4,fileResourceLineCount=3|tableResource1RecordCount=4|tableResource2RecordCount=4",
TestUtil.join(out.get(2)));
}
}
\ No newline at end of file
package com.aliyun.odps.examples.udj;
import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.Yieldable;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.udf.DataAttributes;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDJ;
import com.aliyun.odps.udf.annotation.Resolve;
import java.util.ArrayList;
import java.util.Iterator;
/** For each record of right table, find the nearest record of left table and
* merge two records.
*/
@Resolve("->string,bigint,string")
public class PayUserLogMergeJoin extends UDJ {
private Record outputRecord;
/** Will be called prior to the data processing phase. User could implement
* this method to do initialization work.
*/
@Override
public void setup(ExecutionContext executionContext, DataAttributes dataAttributes) {
//
outputRecord = new ArrayRecord(new Column[]{
new Column("user_id", OdpsType.STRING),
new Column("time", OdpsType.BIGINT),
new Column("content", OdpsType.STRING)
});
}
/** Override this method to implement join logic.
* @param key Current join key
* @param left Group of records of left table corresponding to the current key
* @param right Group of records of right table corresponding to the current key
* @param output Used to output the result of UDJ
*/
@Override
public void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) {
outputRecord.setString(0, key.getString(0));
if (!right.hasNext()) {
// Empty right group, do nothing.
return;
} else if (!left.hasNext()) {
// Empty left group. Output all records of right group without merge.
while (right.hasNext()) {
Record logRecord = right.next();
outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
outputRecord.setString(2, logRecord.getString(1));
output.yield(outputRecord);
}
return;
}
ArrayList<Record> pays = new ArrayList<>();
// The left group of records will be iterated from the start to the end
// for each record of right group, but the iterator cannot be reset.
// So we save every records of left to an ArrayList.
left.forEachRemaining(pay -> pays.add(pay.clone()));
while (right.hasNext()) {
Record log = right.next();
long logTime = log.getDatetime(0).getTime();
long minDelta = Long.MAX_VALUE;
Record nearestPay = null;
// Iterate through all records of left, and find the pay record that has
// the minimal difference in terms of time.
for (Record pay: pays) {
long delta = Math.abs(logTime - pay.getDatetime(0).getTime());
if (delta < minDelta) {
minDelta = delta;
nearestPay = pay;
}
}
// Merge the log record with nearest pay record and output to the result.
outputRecord.setBigint(1, log.getDatetime(0).getTime());
outputRecord.setString(2, mergeLog(nearestPay.getString(1), log.getString(1)));
output.yield(outputRecord);
}
}
String mergeLog(String payInfo, String logContent) {
return logContent + ", pay " + payInfo;
}
@Override
public void close() {
}
}
\ No newline at end of file
package com.aliyun.odps.examples.unstructured;
import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.io.InputStreamSet;
import com.aliyun.odps.io.SourceInputStream;
import com.aliyun.odps.udf.DataAttributes;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.Extractor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.HashMap;
public class SpeechSentenceSnrExtractor extends Extractor {
private final static Log logger = LogFactory.getLog(SpeechSentenceSnrExtractor.class);
private static final String MLF_FILE_ATTRIBUTE_KEY = "mlfFileName";
private static final String SPEECH_SAMPLE_RATE_KEY = "speechSampleRateInKHz";
private String mlfFileName;
private HashMap<String, UtteranceLabel> utteranceLabels;
private InputStreamSet inputs;
private DataAttributes attributes;
private double sampleRateInKHz;
public SpeechSentenceSnrExtractor(){
this.utteranceLabels = new HashMap<String, UtteranceLabel>();
}
@Override
public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes){
this.inputs = inputs;
this.attributes = attributes;
this.mlfFileName = this.attributes.getValueByKey(MLF_FILE_ATTRIBUTE_KEY);
if (this.mlfFileName == null){
throw new IllegalArgumentException("A mlf file must be specified in extractor attribute.");
}
String sampleRateInKHzStr = this.attributes.getValueByKey(SPEECH_SAMPLE_RATE_KEY);
if (sampleRateInKHzStr == null){
throw new IllegalArgumentException("The speech sampling rate must be specified in extractor attribute.");
}
this.sampleRateInKHz = Double.parseDouble(sampleRateInKHzStr);
try {
BufferedInputStream inputStream = ctx.readResourceFileAsStream(mlfFileName);
loadMlfLabelsFromResource(inputStream);
inputStream.close();
} catch (IOException e) {
throw new RuntimeException("reading model from mlf failed with exception " + e.getMessage());
}
}
@Override
public Record extract() throws IOException {
SourceInputStream inputStream = inputs.next();
if (inputStream == null){
return null;
}
String fileName = inputStream.getFileName();
fileName = fileName.substring(fileName.lastIndexOf('/') + 1);
logger.info("Processing wav file " + fileName);
// full file path: path/to/XXX.wav => XXX as id
String id = fileName.substring(0, fileName.lastIndexOf('.'));
long fileSize = inputStream.getFileSize();
if (fileSize > Integer.MAX_VALUE){
// technically a larger file can be read via multiple batches,
// but we simply do not support it in this example.
throw new IllegalArgumentException("Do not support speech file larger than 2G bytes");
}
byte[] buffer = new byte[(int)fileSize];
Column[] outputColumns = this.attributes.getRecordColumns();
ArrayRecord record = new ArrayRecord(outputColumns);
if (outputColumns.length != 2 || outputColumns[0].getType() != OdpsType.DOUBLE
|| outputColumns[1].getType() != OdpsType.STRING){
throw new IllegalArgumentException("Expecting output to of schema double|string.");
}
int readSize = inputStream.readToEnd(buffer);
inputStream.close();
double snr = computeSnr(id, buffer, readSize);
record.setDouble(0, snr);
record.setString(1, id);
logger.info(String.format("file [%s] snr computed to be [%f]db", fileName, snr));
return record;
}
@Override
public void close(){
//no-op
}
private void loadMlfLabelsFromResource(BufferedInputStream fileInputStream)
throws IOException {
BufferedReader br = new BufferedReader(new InputStreamReader(fileInputStream));
String line;
String id = "";
// here we relies on the particular format of the mlf to load labels from the file
while ((line = br.readLine()) != null) {
if (line.trim().isEmpty()){
continue;
}
if (line.startsWith("id:")){
id = line.split(":")[1].trim();
}
else{
// in this branch, line must be the label
this.utteranceLabels.put(id, new UtteranceLabel(id, line, " "));
}
}
}
// compute the snr of the speech sentence, assuming the input buffer contains the entire content of a wav file
private double computeSnr(String id, byte[] buffer, int validBufferLen){
final int headerLength = 44;
if (validBufferLen < headerLength){
throw new IllegalArgumentException("A wav buffer must be at least larger than standard wav header size.");
}
// each frame is 10 ms
int sampleCountPerFrame = (int)this.sampleRateInKHz * 10;
// each data point denoted by a short integer (2 bytes)
int dataLen = (validBufferLen - headerLength) / 2;
if (dataLen % sampleCountPerFrame != 0){
throw new IllegalArgumentException(
String.format("Invalid wav file where dataLen %d does not divide sampleCountPerFrame %d",
dataLen, sampleCountPerFrame));
}
// total number of frames in the wav file
int frameCount = dataLen / sampleCountPerFrame;
UtteranceLabel utteranceLabel = this.utteranceLabels.get(id);
if (utteranceLabel == null){
throw new IllegalArgumentException(String.format("Cannot find label of id %s from MLF.", id));
}
ArrayList<Long> labels = utteranceLabel.getLabels();
// usually frameCount should be larger than labels.size() by a small margin
// in our sample data, this margin is 2.
if (labels.size() + 2 != frameCount){
throw new IllegalArgumentException(String.format("Mismatched frame labels size % d and frameCount %d.",
labels.size() + 2, frameCount ));
}
int offset = headerLength;
short data[] = new short[sampleCountPerFrame];
double energies[] = new double[frameCount];
for (int i = 0; i < frameCount; i++ ){
ByteBuffer.wrap(buffer, offset, sampleCountPerFrame * 2)
.order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().get(data);
double frameEnergy = 0;
for (int j = 0; j < sampleCountPerFrame; j++){
frameEnergy += data[j] * data[j];
}
energies[i] = frameEnergy;
offset += sampleCountPerFrame * 2;
}
double averageSpeechPower = 0;
double averageNoisePower = 0.00000001;
int speechframeCount = 0;
int noiseframeCount = 0;
for (int i = 0; i < labels.size(); i++){
if (labels.get(i) == 0){
averageNoisePower += energies[i];
noiseframeCount++;
} else {
averageSpeechPower += energies[i];
speechframeCount++;
}
}
if (noiseframeCount > 0){
averageNoisePower /= noiseframeCount;
} else {
// no noise, pure speech snr = max of 100db
return 100;
}
if (speechframeCount > 0) {
averageSpeechPower /= speechframeCount;
} else {
// no speech, pure noise, snr = min of -100db
return -100;
}
return 10 * Math.log10(averageSpeechPower/averageNoisePower);
}
}
class UtteranceLabel {
private String id; // id is the same as file name
private ArrayList<Long> labels;
private long labelIndex;
private long frameCount;
public String getId(){
return id;
}
public ArrayList<Long> getLabels(){
return this.labels;
}
UtteranceLabel(String id, String labelString, String labelDelimiter){
// note: no error checking here
this.labels = new ArrayList<Long>();
this.id = id;
final String[] splits = labelString.split(labelDelimiter);
if (splits.length < 2){
throw new InvalidParameterException("Invalid label line: at least index and length should be provided.");
}
this.labelIndex = Long.parseLong(splits[0]);
this.frameCount = Long.parseLong(splits[1]);
if (splits.length != frameCount + 2){
throw new InvalidParameterException("Label length mismatches label header meta.");
}
for (int i = 2; i < splits.length; i++){
long label = Long.parseLong(splits[i]);
// normalize vector entry to denote voice/non-voice, we only need this for snr computation
if (label >= 2057 && label <= 2059){
label = 0;
} else {
label = 1;
}
labels.add(label);
}
}
}
\ No newline at end of file
package com.aliyun.odps.examples.unstructured;
import com.aliyun.odps.udf.Extractor;
import com.aliyun.odps.udf.OdpsStorageHandler;
import com.aliyun.odps.udf.Outputer;
public class SpeechStorageHandler extends OdpsStorageHandler {
@Override
public Class<? extends Extractor> getExtractorClass() {
return SpeechSentenceSnrExtractor.class;
}
@Override
public Class<? extends Outputer> getOutputerClass() {
throw new UnsupportedOperationException();
}
}
package com.aliyun.odps.examples.unstructured;
import com.aliyun.odps.Column;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.io.InputStreamSet;
import com.aliyun.odps.udf.DataAttributes;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.Extractor;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
/**
* Text extractor that extract schematized records from formatted plain-text(csv, tsv etc.)
**/
public class TextExtractor extends Extractor {
private InputStreamSet inputs;
private String columnDelimiter;
private DataAttributes attributes;
private BufferedReader currentReader;
private boolean firstRead = true;
public TextExtractor() {
// default to ",", this can be overwritten if a specific delimiter is provided (via DataAttributes)
this.columnDelimiter = ",";
}
// no particular usage for execution context in this example
@Override
public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes) {
this.inputs = inputs;
this.attributes = attributes;
// check if "delimiter" attribute is supplied via SQL query
String columnDelimiter = this.attributes.getValueByKey("delimiter");
if ( columnDelimiter != null)
{
this.columnDelimiter = columnDelimiter;
}
System.out.println("TextExtractor using delimiter [" + this.columnDelimiter + "].");
// note: more properties can be inited from attributes if needed
}
@Override
public Record extract() throws IOException {
String line = readNextLine();
if (line == null) {
return null;
}
return textLineToRecord(line);
}
@Override
public void close(){
// no-op
}
private Record textLineToRecord(String line) throws IllegalArgumentException
{
Column[] outputColumns = this.attributes.getRecordColumns();
ArrayRecord record = new ArrayRecord(outputColumns);
if (this.attributes.getRecordColumns().length != 0){
// string copies are needed, not the most efficient one, but suffice as an example here
String[] parts = line.split(columnDelimiter);
int[] outputIndexes = this.attributes.getNeededIndexes();
if (outputIndexes == null){
throw new IllegalArgumentException("No outputIndexes supplied.");
}
if (outputIndexes.length != outputColumns.length){
throw new IllegalArgumentException("Mismatched output schema: Expecting "
+ outputColumns.length + " columns but get " + parts.length);
}
int index = 0;
for(int i = 0; i < parts.length; i++){
// only parse data in columns indexed by output indexes
if (index < outputIndexes.length && i == outputIndexes[index]){
switch (outputColumns[index].getType()) {
case STRING:
record.setString(index, parts[i]);
break;
case BIGINT:
record.setBigint(index, Long.parseLong(parts[i]));
break;
case BOOLEAN:
record.setBoolean(index, Boolean.parseBoolean(parts[i]));
break;
case DOUBLE:
record.setDouble(index, Double.parseDouble(parts[i]));
break;
case DATETIME:
case DECIMAL:
case ARRAY:
case MAP:
default:
throw new IllegalArgumentException("Type " + outputColumns[index].getType() + " not supported for now.");
}
index++;
}
}
}
return record;
}
/**
* Read next line from underlying input streams.
* @return The next line as String object. If all of the contents of input
* streams has been read, return null.
*/
private String readNextLine() throws IOException {
if (firstRead) {
firstRead = false;
// the first read, initialize things
currentReader = moveToNextStream();
if (currentReader == null) {
// empty input stream set
return null;
}
}
while (currentReader != null) {
String line = currentReader.readLine();
if (line != null) {
return line;
}
currentReader = moveToNextStream();
}
return null;
}
private BufferedReader moveToNextStream() throws IOException {
InputStream stream = inputs.next();
if (stream == null) {
return null;
} else {
return new BufferedReader(new InputStreamReader(stream));
}
}
}
\ No newline at end of file
package com.aliyun.odps.examples.unstructured;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.io.OutputStreamSet;
import com.aliyun.odps.io.SinkOutputStream;
import com.aliyun.odps.udf.DataAttributes;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.Outputer;
import java.io.IOException;
public class TextOutputer extends Outputer {
private SinkOutputStream outputStream;
private DataAttributes attributes;
private String delimiter;
public TextOutputer (){
// default delimiter, this can be overwritten if a delimiter is provided through the attributes.
this.delimiter = "|";
}
@Override
public void output(Record record) throws IOException {
this.outputStream.write(recordToString(record).getBytes());
}
// no particular usage of execution context in this example
@Override
public void setup(ExecutionContext ctx, OutputStreamSet outputStreamSet, DataAttributes attributes) throws IOException {
this.outputStream = outputStreamSet.next();
this.attributes = attributes;
}
@Override
public void close() {
// no-op
}
private String recordToString(Record record){
StringBuilder sb = new StringBuilder();
for (int i = 0; i < record.getColumnCount(); i++)
{
if (null == record.get(i)){
sb.append("NULL");
}
else{
sb.append(record.get(i).toString());
}
if (i != record.getColumnCount() - 1){
sb.append(this.delimiter);
}
}
sb.append("\n");
return sb.toString();
}
}
package com.aliyun.odps.examples.unstructured;
import com.aliyun.odps.udf.Extractor;
import com.aliyun.odps.udf.OdpsStorageHandler;
import com.aliyun.odps.udf.Outputer;
public class TextStorageHandler extends OdpsStorageHandler {
@Override
public Class<? extends Extractor> getExtractorClass() {
return TextExtractor.class;
}
@Override
public Class<? extends Outputer> getOutputerClass() {
return TextOutputer.class;
}
}
package com.aliyun.odps.examples.unstructured.test;
import com.aliyun.odps.Column;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.examples.TestUtil;
import com.aliyun.odps.examples.unstructured.SpeechSentenceSnrExtractor;
import com.aliyun.odps.examples.unstructured.TextExtractor;
import com.aliyun.odps.udf.local.runner.ExtractorRunner;
import com.aliyun.odps.udf.local.util.LocalDataAttributes;
import com.aliyun.odps.udf.local.util.UnstructuredUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ExtractorTest {
private String ambulanceFullSchema =
"vehicle:bigint;id:bigint;patient:bigint;calls:bigint;latitude:double;longitude:double;time:string;direction:string";
private String speechDataFullSchema = "sentence_snr:double;id:string";
@BeforeClass
public static void initWarehouse() {
TestUtil.initWarehouse();
}
@Test
public void testTextExtractor() throws Exception {
/**
* Equivalent to the following SQL:
CREATE EXTERNAL TABLE ambulance_data_external
( vehicle bigint, id bigint, patient bigint, calls bigint,
Latitude double, Longitude double, time string, direction string)
STORED BY 'com.aliyun.odps.udf.example.text.TextStorageHandler'
LOCATION 'oss://.../data/ambulance_csv/'
USING 'jar_file_name.jar';
SELECT * FROM ambulance_data_external;
*/
Column[] externalTableSchema = UnstructuredUtils.parseSchemaString(ambulanceFullSchema);
// note: default delimiter used in TextExtractor is ','
LocalDataAttributes attributes = new LocalDataAttributes(null, externalTableSchema);
ExtractorRunner runner = new ExtractorRunner(TestUtil.getOdps(), new TextExtractor(), attributes);
//using local file directory to mock data source
runner.feedDirectory(TestUtil.class.getResource("/data/ambulance_csv/").getPath());
List<Record> records = runner.yieldRecords();
// do verification below
Assert.assertEquals(records.size(), 15);
ArrayRecord record0 = new ArrayRecord(externalTableSchema);
record0.set(0, (long)1);
record0.set(1, (long)1);
record0.set(2, (long)51);
record0.set(3, (long)1);
record0.set(4, 46.81006);
record0.set(5, -92.08174);
record0.set(6, "9/14/2014 0:00");
record0.set(7, "S");
Assert.assertTrue(UnstructuredUtils.recordsEqual(record0, records.get(0)));
}
@Test
public void testSpeechExtraction() throws Exception {
/**
* Equivalent to the following SQL:
CREATE EXTERNAL TABLE speech_snr_external
(sentence_snr double, id string)
STORED BY 'com.aliyun.odps.udf.example.speech.SpeechStorageHandler'
WITH SERDEPROPERTIES ('mlfFileName'='speech_model_random_5_utterance' , 'speechSampleRateInKHz' = '16')
LOCATION 'oss://.../data/speech_wav/'
USING 'jar_file_name.jar';
SELECT * FROM speech_snr_external;
*/
Column[] externalTableSchema = UnstructuredUtils.parseSchemaString(speechDataFullSchema);
Map<String, String> userProperties = new HashMap<String, String>();
// a file resource
userProperties.put("mlfFileName", "speech_model_random_5_utterance");
// an extractor parameter
userProperties.put("speechSampleRateInKHz", "16");
LocalDataAttributes attributes = new LocalDataAttributes(userProperties, externalTableSchema);
// SpeechSentenceSnrExtractor will analyze a speech wav file and output
// 1. the average sentence snr of a wav file
// 2. the corresponding wav file name
ExtractorRunner runner = new ExtractorRunner(TestUtil.getOdps(), new SpeechSentenceSnrExtractor(), attributes);
runner.feedDirectory(TestUtil.class.getResource("/data/speech_wav/").getPath());
List<Record> records = runner.yieldRecords();
// do verification below
Assert.assertEquals(records.size(), 3);
ArrayRecord record0 = new ArrayRecord(externalTableSchema);
record0.set(0, 31.39050062838079);
record0.set(1, "tsh148_seg_2_3013_3_6_48_80bd359827e24dd7_0");
Assert.assertTrue(UnstructuredUtils.recordsEqual(record0, records.get(0)));
ArrayRecord record1 = new ArrayRecord(externalTableSchema);
record1.set(0, 35.477360745366035);
record1.set(1, "tsh148_seg_3013_1_31_11_9d7c87aef9f3e559_0");
Assert.assertTrue(UnstructuredUtils.recordsEqual(record1, records.get(1)));
ArrayRecord record2 = new ArrayRecord(externalTableSchema);
record2.set(0, 16.046150955268665);
record2.set(1, "tsh148_seg_3013_2_29_49_f4cb0990a6b4060c_0");
Assert.assertTrue(UnstructuredUtils.recordsEqual(record2, records.get(2)));
}
}
\ No newline at end of file
package com.aliyun.odps.examples.unstructured.test;
import com.aliyun.odps.Column;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.examples.TestUtil;
import com.aliyun.odps.udf.example.text.TextOutputer;
import com.aliyun.odps.udf.local.runner.OutputerRunner;
import com.aliyun.odps.udf.local.util.LocalDataAttributes;
import com.aliyun.odps.udf.local.util.UnstructuredUtils;
import com.aliyun.odps.utils.StringUtils;
import org.junit.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class OutputerTest {
private String simpleTableSchema = "a:bigint;b:double;c:string";
private String adsLogTableSchema = "AdId:BIGINT;Rand:DOUBLE;AdvertiserName:STRING;Comment:STRING";
private File outputDirectory = null;
@BeforeClass
public static void initWarehouse() {
TestUtil.initWarehouse();
}
@Before
public void before() throws IOException{
// output directory preparation
outputDirectory = new File("temp/" + UnstructuredUtils.generateOutputName());
outputDirectory.delete();
outputDirectory.mkdirs();
}
@Test
public void testOutputSimpleText() throws Exception {
/**
* Test outputting manually constructed records to text
*/
Column[] externalTableSchema = UnstructuredUtils.parseSchemaString(simpleTableSchema);
LocalDataAttributes attributes = new LocalDataAttributes(null, externalTableSchema);
// TextOutputer will output one single file
OutputerRunner runner = new OutputerRunner(TestUtil.getOdps(), new TextOutputer(), attributes);
List<Record> records = new ArrayList<Record>();
records.add(new ArrayRecord(externalTableSchema, new Object[]{(long)1, 2.5, "row0"}));
records.add(new ArrayRecord(externalTableSchema, new Object[]{(long)1234567, 8.88, "row1"}));
records.add(new ArrayRecord(externalTableSchema, new Object[]{(long)12, 123.1, "testrow"}));
// run outputer
runner.feedRecords(records);
runner.yieldTo(outputDirectory.getAbsolutePath());
String expcetedOutput = "1|2.5|row0\n" +
"1234567|8.88|row1\n" +
"12|123.1|testrow\n";
verifySingleFileOutput(expcetedOutput);
}
@Test
public void testOutputSpecialText() throws Exception {
/**
* Test reading from internal table and outputting to text file, with a user defined delimiter.
* Equivalent to the following SQL:
*
CREATE EXTERNAL TABLE ads_log_external
(AdId bigint, Rand double,
AdvertiserName string, Comment string)
STORED BY 'com.aliyun.odps.udf.example.text.TextStorageHandler'
WITH SERDEPROPERTIES ('delimiter'='\t')
LOCATION 'oss://path/to/output/'
USING 'jar_file_name.jar';;
INSERT OVERWRITE ads_log_external SELECT * FROM ads_log;
* Here ads_log is an internal table (locally defined in warehouse directory)
*/
Column[] externalTableSchema = UnstructuredUtils.parseSchemaString(adsLogTableSchema);
Map<String, String> userProperties = new HashMap<String, String>();
userProperties.put("delimiter", "\t");
LocalDataAttributes attributes = new LocalDataAttributes(userProperties, externalTableSchema);
// TextOutputer outputs one single file
OutputerRunner runner = new OutputerRunner(TestUtil.getOdps(), new TextOutputer(), attributes);
String internalTableName = "ads_log";
// We are doing SELECT * FROM here, so the two tables have the same schema
Column[] internalTableSceham = externalTableSchema;
List<Record> records = new ArrayList<Record>();
Record record;
while ((record = UnstructuredUtils.readFromInternalTable("example_project", internalTableName,
internalTableSceham, null)) != null){
records.add(record.clone());
}
// run outputer
runner.feedRecords(records);
runner.yieldTo(outputDirectory.getAbsolutePath());
String expcetedOutput = "399266\t0.5\tDoritos\twhat is up\n" +
"399266\t0.0\tTacobell\thello!\n" +
"382045\t-76.0\tVoelkl\trandom comments\n" +
"382045\t6.4\tWhistler Resort\ta\n" +
"106479\t98.7\tAmazon Prime\tbdcd\n" +
"906441\t-9865788.2\tHayden Planetarium\tplatium\n" +
"351530\t0.005\tMicrosoft Azure Services\ttst\n";
verifySingleFileOutput(expcetedOutput);
}
private void verifySingleFileOutput(String expectedOutput) throws IOException {
verifyFilesOutput(new String[]{expectedOutput});
}
private void verifyFilesOutput(String[] expectedOutputs) throws IOException {
File[] outputs = outputDirectory.listFiles();
Assert.assertEquals(outputs.length, expectedOutputs.length);
for (int i = 0; i < outputs.length; i++){
File outputFile = outputs[i];
FileInputStream fis = new FileInputStream(outputFile);
byte[] data = new byte[(int)outputFile.length()];
fis.read(data);
String content = new String(data);
String[] rows = StringUtils.split(content, '\n');
String[] expectedRows = StringUtils.split(expectedOutputs[i], '\n');
// due to double presentation accuracy difference, the output may not exactly match expected,
// therefore we only verify that numbers of rows match.
Assert.assertEquals(rows.length, expectedRows.length);
}
}
}
\ No newline at end of file
1,1,51,1,46.81006,-92.08174,9/14/2014 0:00,S
1,2,13,1,46.81006,-92.08174,9/14/2014 0:00,NE
1,3,48,1,46.81006,-92.08174,9/14/2014 0:00,NE
1,4,30,1,46.81006,-92.08174,9/14/2014 0:00,W
1,5,47,1,46.81006,-92.08174,9/14/2014 0:00,S
1,6,9,1,46.81006,-92.08174,9/14/2014 0:00,S
1,1,40,1,46.81006,-92.08174,9/15/2014 0:00,NE
1,2,33,1,46.81006,-92.08174,9/15/2014 0:00,NE
1,3,60,1,46.81006,-92.08174,9/15/2014 0:00,NW
1,4,50,1,46.81006,-92.08174,9/15/2014 0:00,SW
1,5,50,1,46.81006,-92.08174,9/15/2014 0:00,S
1,6,53,1,46.81006,-92.08174,9/15/2014 0:00,NE
1,7,60,1,46.81006,-92.08174,9/15/2014 0:00,NE
1,8,75,1,46.81006,-92.08174,9/15/2014 0:00,E
1,9,75,1,46.81006,-92.08174,9/15/2014 0:00,E
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aliyun.odps.myJava</groupId>
<artifactId>AprilModule</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<version>${sdk.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-udf</artifactId>
<version>${sdk.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-udf-local</artifactId>
<version>${sdk.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-mapred</artifactId>
<version>${sdk.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-mapred-local</artifactId>
<version>${sdk.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-graph</artifactId>
<version>${sdk.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-graph-local</artifactId>
<version>${sdk.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.36</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.json/json -->
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20180130</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
</dependencies>
<properties>
<sdk.version>0.30.8-public</sdk.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<build>
<pluginManagement>
<plugins>
<!-- 编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!-- 编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar插件 -->
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>com.alibaba</pattern>
<shadedPattern>shaded.com.alibaba</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package miya.epayment;
/**
* Created by admin on 2019/7/16.
*/
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;
@Resolve({"string,string->String"})
public class GetPromoteInfoUdf extends UDF {
public String evaluate(String promoteAmt,String promoteType) {
return Tools.getPromoteInfo(promoteAmt,promoteType);
}
}
package miya.epayment;
/**
* Created by admin on 2019/7/16.
*/
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;
import org.json.JSONObject;
import org.json.XML;
@Resolve({"string,String->String"})
public class GetTotalAmtUdf extends UDF {
public String evaluate(String promoteAmt,String orderAmt) {
return Tools.getTotalAmt(promoteAmt,orderAmt);
}
}
package miya.epayment;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import org.json.JSONObject;
/**
* Created by admin on 2019/5/15.
*/
public class JsonUtils {
/**
* JSONObject获取指定key
*
* @param json
* @param key
* @return
*/
public static String jsonObjectGetString(JSONObject json, String key) {
if (null==json) {return null;}
try {
//String value = json.getString(key);
String value = json.get(key).toString();
return value;
} catch (Exception e) {
return null;
}
}
/**
* JSONObject获取指定两个key
*
* @param json
* @param key1
* @param key2
* @return
*/
public static String jsonObjectGetString(JSONObject json, String key1, String key2) {
if (null==json) {return null;}
try {
String value = json.get(key1).toString();
if (null!=value) {
return value;
}else{
return json.get(key2).toString();
}
} catch (Exception e) {
return null;
}
}
/**
* JSONObject获取指定JSONObject
*
* @param json
* @param key
* @return
*/
public static JSONObject jsonObjectGetObj(JSONObject json, String key) {
JSONObject obj=null;
if (null==json) {return obj;}
try {
obj=json.getJSONObject(key);
return obj;
} catch (Exception e) {
return obj;
}
}
/**
* JSONObject合并
*
* @param json1
* @param json2
*/
public static void jsonUnion(com.alibaba.fastjson.JSONObject json1, com.alibaba.fastjson.JSONObject json2) {
json1.putAll(json2);
}
/**
* JSONObject获取指定JSONArray
*
* @param json
* @param key
* @return
*/
public static JSONArray jsonObjectGetArr(com.alibaba.fastjson.JSONObject json, String key) {
if (null==json) {return null;}
try {
JSONArray jsonArray = json.getJSONArray(key);
return jsonArray;
} catch (Exception e) {
return null;
}
}
/**
* JSONObject获取指定JSONArray
*
* @param strJsonArr
* @return
*/
public static JSONArray str2JsonArr(String strJsonArr) {
try {
JSONArray jsonArray = JSON.parseArray(strJsonArr);
return jsonArray;
} catch (Exception e) {
return null;
}
}
}
package miya.epayment;
/**
* Created by admin on 2019/7/16.
*/
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;
@Resolve({"string->String"})
public class MatchPlatformUdf extends UDF {
public String evaluate(String platform) {
return Tools.matchPaymentPlatform(platform);
}
}
package miya.epayment;
/**
* Created by admin on 2019/7/16.
*/
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;
import org.json.JSONObject;
import org.json.XML;
import java.util.concurrent.ConcurrentHashMap;
import static miya.epayment.JsonUtils.jsonObjectGetObj;
import static miya.epayment.JsonUtils.jsonObjectGetString;
@Resolve({"string,string,string,string,string->string"})
public class PayLogUdf extends UDF {
public String evaluate(String xmlOrJson, String tradeType, String payType, String tradeStep, String position) {
try {
// ConcurrentHashMap<Long, String> valueJSon = new ConcurrentHashMap<>();
if ("Trade00".equals(tradeStep) && "1.5".equals(tradeType)) {
JSONObject json = XML.toJSONObject(xmlOrJson);
JSONObject resJson = null;
if (!json.has("xml")) {
resJson = json;
} else {
resJson = json.getJSONObject("xml");
}
double goodsAmt = 0;
if (resJson.has("data") && resJson.has("request")) {
JSONObject request = jsonObjectGetObj(resJson, "request");
JSONObject data = jsonObjectGetObj(resJson, "data");
String b5 = jsonObjectGetString(data, "B5"); //商品明细信息
String a4 = jsonObjectGetString(request, "A4"); //门店 pos 机编号
String a5 = jsonObjectGetString(request, "A5"); //门店收银员编号
goodsAmt = Tools.getGoodsAmt(b5);
JSONObject valueJSon = new JSONObject();;
valueJSon.put("8", goodsAmt+"");
valueJSon.put("10", a4);
valueJSon.put("11", a5);
return valueJSon.getString(position);
} else {
return null;
}
} else if ("Trade00".equals(tradeStep) && "1.0".equals(tradeType)) {
return dealOldTrade00(xmlOrJson, position);
} else {
if ("1.5".equals(tradeType)) {
if ("A".equals(payType) || "B".equals(payType)) {
return dealTrade03(xmlOrJson,position);
} else if ("C".equals(payType) || "D".equals(payType)) {
return dealTrade03Refund(xmlOrJson,position);
} else if ("E".equals(payType)) {
return dealTrade03Revoke(xmlOrJson,position);
} else if ("F".equals(payType)) {
return dealTrade03Preorder(xmlOrJson,position);
} else if ("G".equals(payType)) {
return dealTrade03G(xmlOrJson,position);
} else {
return dealTrade03(xmlOrJson,position);
}
} else if ("1.0".equals(tradeType)) {
return dealOldTrade03(xmlOrJson,position);
} else {
return null;
}
}
} catch (Exception e) {
return null;
}
}
/**
* 老接口1.0的处理收银员和pos级
*
* @param xmlOrJson
*/
private String dealOldTrade00(String xmlOrJson, String position) {
JSONObject json = XML.toJSONObject(xmlOrJson);
JSONObject resJson = null;
if (!json.has("xml")) {
resJson = json;
} else {
try {
resJson = json.getJSONObject("xml").getJSONObject("request");
} catch (Exception e) {
resJson = null;
}
}
JSONObject valueJSon = new JSONObject();;
valueJSon.put("10", jsonObjectGetString(resJson, "userid"));
valueJSon.put("11", jsonObjectGetString(resJson, "cashier"));
return valueJSon.getString(position);
}
//A,B
private String dealTrade03(String xmlOrJson,String position) {
JSONObject json = XML.toJSONObject(xmlOrJson);
JSONObject resJson = null;
if (!json.has("xml")) {
resJson = json;
} else {
resJson = json.getJSONObject("xml");
}
JSONObject valueJSon = new JSONObject();;
valueJSon.put("0", jsonObjectGetString(resJson, "C2"));
valueJSon.put("1", jsonObjectGetString(resJson, "C3"));
valueJSon.put("2", jsonObjectGetString(resJson, "C4"));
valueJSon.put("3", jsonObjectGetString(resJson, "C5"));
valueJSon.put("4", jsonObjectGetString(resJson, "C6"));
valueJSon.put("5", jsonObjectGetString(resJson, "C7"));
valueJSon.put("6", jsonObjectGetString(resJson, "C10"));
valueJSon.put("9", Tools.getUserid(resJson));
valueJSon.put("13", jsonObjectGetString(resJson, "C9"));
valueJSon.put("14", jsonObjectGetString(resJson, "C11"));
return valueJSon.getString(position);
}
//C,D
private String dealTrade03Refund(String xmlOrJson , String position) {
JSONObject json = XML.toJSONObject(xmlOrJson);
JSONObject resJson = null;
if (!json.has("xml")) {
resJson = json;
} else {
resJson = json.getJSONObject("xml");
}
JSONObject valueJSon = new JSONObject();;
valueJSon.put("0", jsonObjectGetString(resJson, "C2"));
valueJSon.put("1", jsonObjectGetString(resJson, "C3"));
valueJSon.put("2", jsonObjectGetString(resJson, "C4"));
valueJSon.put("3", jsonObjectGetString(resJson, "C5"));
valueJSon.put("4", jsonObjectGetString(resJson, "C6"));
valueJSon.put("5", jsonObjectGetString(resJson, "C8"));
valueJSon.put("6", jsonObjectGetString(resJson, "C16"));
valueJSon.put("7", jsonObjectGetString(resJson, "C9"));
valueJSon.put("12", jsonObjectGetString(resJson, "C7"));
return valueJSon.getString(position);
}
//E
private String dealTrade03Revoke(String xmlOrJson ,String position) {
JSONObject json = XML.toJSONObject(xmlOrJson);
JSONObject resJson = null;
if (!json.has("xml")) {
resJson = json;
} else {
resJson = json.getJSONObject("xml");
}
JSONObject valueJSon = new JSONObject();;
valueJSon.put("0", jsonObjectGetString(resJson, "C2"));
valueJSon.put("1", jsonObjectGetString(resJson, "C3"));
valueJSon.put("2", jsonObjectGetString(resJson, "C4"));
valueJSon.put("3", jsonObjectGetString(resJson, "C5"));
// valueJSon.put("4", jsonObjectGetString(resJson, "C6"));
// valueJSon.put("5", jsonObjectGetString(resJson, "C10"));
return valueJSon.getString(position);
}
//F
private String dealTrade03Preorder(String xmlOrJson,String position) {
JSONObject json = XML.toJSONObject(xmlOrJson);
JSONObject resJson = null;
if (!json.has("xml")) {
resJson = json;
} else {
resJson = json.getJSONObject("xml");
}
JSONObject valueJSon = new JSONObject();;
valueJSon.put("0", jsonObjectGetString(resJson, "C2"));
valueJSon.put("1", jsonObjectGetString(resJson, "C3"));
valueJSon.put("2", jsonObjectGetString(resJson, "C4"));
valueJSon.put("3", jsonObjectGetString(resJson, "C5"));
valueJSon.put("5", jsonObjectGetString(resJson, "C7"));
return valueJSon.getString(position);
}
//G
private String dealTrade03G(String xmlOrJson ,String position) {
JSONObject json = XML.toJSONObject(xmlOrJson);
JSONObject resJson = null;
if (!json.has("xml")) {
resJson = json;
} else {
resJson = json.getJSONObject("xml");
}
JSONObject valueJSon = new JSONObject();;
valueJSon.put("0", jsonObjectGetString(resJson, "C2"));
valueJSon.put("1", jsonObjectGetString(resJson, "C3"));
valueJSon.put("2", jsonObjectGetString(resJson, "C4"));
valueJSon.put("3", jsonObjectGetString(resJson, "C5"));
valueJSon.put("4", jsonObjectGetString(resJson, "C6"));
valueJSon.put("5", jsonObjectGetString(resJson, "C7"));
return valueJSon.getString(position);
}
/**
* 老接口1.0的处理逻辑
*
* @param xmlOrJson
*/
private String dealOldTrade03(String xmlOrJson,String position) {
JSONObject json = XML.toJSONObject(xmlOrJson);
JSONObject resJson = null;
if (!json.has("xml")) {
resJson = json;
} else {
resJson = json.getJSONObject("xml");
}
JSONObject valueJSon = new JSONObject();;
valueJSon.put("0", jsonObjectGetString(resJson, "trad_status"));
valueJSon.put("1", jsonObjectGetString(resJson, "error_code"));
valueJSon.put("2", jsonObjectGetString(resJson, "error_msg"));
valueJSon.put("3", jsonObjectGetString(resJson, "out_trade_no"));
valueJSon.put("4", jsonObjectGetString(resJson, "trade_no"));
valueJSon.put("5", jsonObjectGetString(resJson, "total_fee", "oldtotalfee"));
valueJSon.put("6", jsonObjectGetString(resJson, "fundbilllist"));
valueJSon.put("7", jsonObjectGetString(resJson, "refund_amount"));
valueJSon.put("9", jsonObjectGetString(resJson, "buyer_user_id"));
valueJSon.put("12", jsonObjectGetString(resJson, "out_refund_req_no"));
return valueJSon.getString(position);
}
}
package miya.epayment
import com.alibaba.fastjson.{JSON, JSONArray}
import org.json.{JSONObject, XML}
import scala.util.control.NonFatal
/**
* Created by hope on 2018/8/31.
*/
object Tools {
//模式匹配支付code简称对应的支付名全称
def matchPaymentPlatform(x: String): String = x match {
case "1" => "微信"
case "2" => "翼支付3.0"
case "D" => "支付宝"
case "3" => "支付宝"
case "K" => "工行"
case "4" => "百度钱包"
case "5" => "翼支付"
case "6" => "手Q"
case "7" => "银商资讯"
case "8" => "微众钱包"
case "A" => "大众点评"
case "B" => "银商预付卡"
case "C" => "京东钱包"
case "E" => "招行"
case "I" => "百糯"
case "L" => "飞凡"
case "M" => "华润银行"
case "N" => "银联二维码"
case "P" => "京东钱包"
case "T" => "润钱包"
case "U" => "移动和包"
case "S" => "汇付天下"
case "Q" => "银联商务"
case "9" => "微众有折"
case "F" => "支付宝国际化"
case "H" => "微信国际化"
case "J" => "微信香港钱包"
case "W" => "网商银行"
case "Y" => "沃支付"
case "R" => "京东3.0"
case "G" => "屈臣氏虚拟卡"
case _ => "未知"
}
def distinctStr(str: String): String = {
try {
val lastStr = str.split("-").filter(!"null".equals(_)).last
lastStr
} catch {
case e: Exception => {
null
}
}
}
def str2Arr(str: String): Array[String] = {
var strArray: Array[String] = null
try {
strArray = str.split(",")
strArray
} catch {
case e: Exception => {
strArray
}
}
}
def str2DoubleArr(str: String): Array[Double] = {
var doubleArray: Array[Double] = null
try {
doubleArray = Array[Double](str.toDouble)
doubleArray
} catch {
case e: Exception => {
doubleArray
}
}
}
def str2Double(str: String): Double = {
try {
str.toDouble
} catch {
case e: Exception => {
0
}
}
}
def str2Long(str: String): Long = {
try {
str.toLong
} catch {
case e: Exception => {
0
}
}
}
/**
* 把优惠信息里面的数据加工成实收金额,实收=实扣+平台优惠
* 优惠金额(格式:[实扣金额|商户优惠金额|平台优惠金额|预留|预留|预留])
* 例子:[1500|0|0|0|0|0]
*
* @param promoteAmt
*
*/
def getActualReceived(promoteAmt: String, orderAmt: String): Long = {
try {
if ("[0|0|0|0|0|0]".equals(promoteAmt)) {
orderAmt.toLong
} else {
val promoteAmtArr: Array[String] = promoteAmt.replace("[", "").replace("]", "").split("[|]")
promoteAmtArr(0).toLong + promoteAmtArr(2).toLong
}
} catch {
case NonFatal(t) => {
try {
orderAmt.toLong
} catch {
case NonFatal(t) => 0
}
}
}
}
/**
* 把优惠信息里面的数据加工成实收金额,实收=实扣+平台优惠
* 优惠金额(格式:[实扣金额|商户优惠金额|平台优惠金额|预留|预留|预留])
* 例子:[1500|0|0|0|0|0]
*
* @param promoteAmt
* @param orderAmt
*
*/
def getTotalAmt(promoteAmt: String, orderAmt: String): String = {
try {
// println("promoteAmt: " + promoteAmt)
// println("orderAmt: " + orderAmt)
// println("------------------------------")
if ("[0|0|0|0|0|0]".equals(promoteAmt) || promoteAmt.contains("bill")) {
orderAmt
} else {
val promoteAmtArr: Array[String] = promoteAmt.replace("[", "").replace("]", "").split("[|]")
promoteAmtArr.map(_.toLong).sum.toString
}
} catch {
case NonFatal(t) => {
try {
orderAmt
} catch {
case NonFatal(t) => null
}
}
}
}
/**
* 把优惠信息里面的数据计算相应的优惠金额
* 优惠金额(格式:[实扣金额|商户优惠金额|平台优惠金额|预留|预留|预留])
* 例子:[1500|0|0|0|0|0]
*
* @param promoteAmt [1500|0|0|0|0|0]
* @param promoteType 优惠类型:1-商户优惠金额,2-平台优惠金额
*
*/
def getPromoteInfo(promoteAmt: String, promoteType: String): String = {
try {
if (null == promoteAmt || null == promoteType) {
null
} else {
if (promoteAmt.contains("bill")) {
//consumer_bill:1190;saas_bill:0;other_bill:0
val billArr: Array[String] = promoteAmt.split(";")
if ("2" == promoteType) {
val merchantPromoteAmt = billArr.filter(t => t.contains("other_bill")).last.split(":")(1)
if (merchantPromoteAmt.contains("null")) null else merchantPromoteAmt
} else {
val platformPromoteAmt = billArr.filter(t => t.contains("saas_bill")).last.split(":")(1)
if (platformPromoteAmt.contains("null")) null else platformPromoteAmt
}
} else {
val promoteInfo = promoteAmt.replace("[", "").replace("]", "").split("[|]")(promoteType.toInt)
if (promoteInfo.contains("null")) null else promoteInfo
}
}
} catch {
case NonFatal(t) => {
try {
null
} catch {
case NonFatal(t) => null
}
}
}
}
/**
* 把原始日志的数据加工成总金额
* 优惠金额(格式:[实扣金额|商户优惠金额|平台优惠金额|预留|预留|预留])
* 例子:[1500|0|0|0|0|0]
*
* @param xmlLog
*
*/
def getTotalAmt(xmlLog: String): Long = {
try {
val json = XML.toJSONObject(xmlLog)
val xmlObj = json.getJSONObject("xml")
// println(xmlObj)
val promoteAmt = xmlObj.get("C10").toString
//println(promoteAmt)
val orderAmt = xmlObj.get("C7").toString
//println("orderAmt"+orderAmt)
if (null != promoteAmt && "" != promoteAmt && !"[0|0|0|0|0|0]".equals(promoteAmt)) {
val promoteAmtArr: Array[String] = promoteAmt.replace("[", "").replace("]", "").split("[|]")
promoteAmtArr.map(_.toLong).sum
} else if (null != orderAmt && "" != orderAmt) {
orderAmt.toLong
} else {
0
}
} catch {
case NonFatal(t) => {
0
}
}
}
/**
* 把原始日志中的b5单品详情,求出单品数量
*
* @param b5
*
*/
def getGoodsAmt(b5: String): Double = {
try {
val goodsArr: JSONArray = JSON.parseArray(b5)
var tatal: Double = 0
for (i <- 0 until (goodsArr.size())) {
val quantity = goodsArr.getJSONObject(i).getString("quantity")
var num: Double = 0
if (!quantity.endsWith("0")) {
num = 1.0
} else {
num = quantity.toDouble
}
tatal += num
}
tatal
} catch {
case e: Exception => {
0
}
}
}
/**
* 把原始日志中的userid取出來,微信c9
* 支付寶c19
*
* @param trade03Msg
*
*/
def getUserid(trade03Msg: JSONObject): String = {
try {
if (trade03Msg.toString.contains("微信")) {
JsonUtils.jsonObjectGetString(trade03Msg, "C9")
} else {
JsonUtils.jsonObjectGetString(trade03Msg, "C19")
}
} catch {
case NonFatal(t) => {
null
}
}
}
def main(args: Array[String]): Unit = {
// println(getTotalAmt("[5748|177|0|0|0|0]"))
// println(getTotalAmt("[5748|177|0|0|0|0]"))
// println(getTotalAmt("[1500|0|0|0|0|0]"))
// println(getTotalAmt("", ""))
// println(getTotalAmt("{\"status\":\"00\",\"trade_no\":\"wx2817340979719447c99b520d1813199200\",\"wxsend\":{\"appId\":\"wx9443c4ef422fa1fa\",\"timeStamp\":\"1561714449835\",\"signType\":\"MD5\",\"package\":\"prepay_id=wx2817340979719447c99b520d1813199200\",\"nonceStr\":\"owJkGYTgfkqfXhBCvC8hNZ0RgSzEbgEo\",\"paySign\":\"361C5C0044E557134222A57CADC33473\"},\"payserial\":\"A30Q190628Q601004301020100\"}", "400"))
// println(getTotalAmt(null, "10840"))
// println(matchPaymentPlatform(distinctStr("B-A")))
// println(matchPaymentPlatform(distinctStr("B-B-B-B")))
// println(matchPaymentPlatform(distinctStr("B")))
//println(getGoodsAmt("[{\"goodsId\":\"11002177\",\"goodsName\":\"芒果干\",\"price\":\"159.8\",\"quantity\":\"2.143\"},{\"goodsId\":\"80000111\",\"goodsName\":\"中号纸提袋(常规版)\",\"price\":\"0.5\",\"quantity\":\"1.0\"},{\"goodsId\":\"11005934\",\"goodsName\":\"黄桃汁\",\"price\":\"9.8\",\"quantity\":\"1.0\"},{\"goodsId\":\"11002493\",\"goodsName\":\"黄桃果捞(糖水黄桃罐头)\",\"price\":\"7.9\",\"quantity\":\"2.0\"}]"))
// println(getTotalAmt("consumer_bill:8350;saas_bill:0;other_bill:0;brand_bill:0", "8350"))
println(getTotalAmt("[1480|0|0|0|0|0]", null))
}
}
package miya.udf;
/**
* Created by admin on 2019/7/16.
*/
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;
import java.util.Arrays;
import java.util.List;
@Resolve({"array<string>->String"})
public class ArrayToStringUdf extends UDF {
public String evaluate(List<String> arr) {
return arr.toString();
}
}
package miya.udf;
/**
* Created by admin on 2019/7/16.
*/
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;
import java.util.List;
@Resolve({"array<string>->String"})
public class JsonArrReturnUdf extends UDF {
public String evaluate(List<String> keys) {
JSONArray resArr = new JSONArray();
for (int i = 0; i < keys.size(); i++) {
JSONObject resObj = new JSONObject();
String[] split = keys.get(i).split("-");
resObj.put("lat",split[0]);
resObj.put("lon",split[1]);
resArr.add(resObj);
}
return resArr.toJSONString();
}
}
package miya.udf;
/**
* Created by admin on 2019/7/16.
*/
import com.alibaba.fastjson.JSONObject;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;
import java.util.List;
@Resolve({"array<string>,array<string>->String"})
public class JsonReturnUdf extends UDF {
public String evaluate(List<String> keys, List values) {
JSONObject resObj = new JSONObject();
for (int i = 0; i < keys.size(); i++) {
for (int j = 0; j < values.size(); j++) {
resObj.put(keys.get(i),values.get(j));
}
}
return resObj.toJSONString();
}
}
package miya.udf;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;
import java.util.*;
import java.util.List;
import static java.util.stream.Collectors.toList;
/**
* Created by admin on 2019/9/18.
*/
@Resolve("array<string>,string,array<string>->array<string>")
//list1 为90天购物集合的short_id list2为一年内购物的short_id
public class LosePotential extends UDF {
public List<String> evaluate(List<String> list1, List<String> list2) {
try {
if (null == list1 || null == list2) {
return null;
} else {
//111111
// list2.removeAll(list1);
// //return list2;
//222222222
// List<String> reduce2 = list2.stream().filter(item -> !list1.contains(item)).collect(toList());
// return reduce2;
//333333333
// LinkedList linkedList= new LinkedList(list2);//大集合用linkedlist
// HashSet hashSet= new HashSet(list1);//小集合用hashset
// Iterator iter = linkedList.iterator();//采用Iterator迭代器进行数据的操作
// while(iter.hasNext()){
// if(hashSet.contains(iter.next())){
// iter.remove();
// }
// }
// return linkedList;
//44444
HashSet hashSet= new HashSet(list1);//小集合用hashset
list2.removeAll(hashSet);
return list2;
//55555
// HashSet hashSet= new HashSet(list1);//小集合用hashset
// List<String> reduce2 = list2.stream().filter(item -> !hashSet.contains(item)).collect(toList());
// return reduce2;
}
} catch (Exception e) {
return null;
}
}
}
package miya.udf;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;
import java.util.ArrayList;
import java.util.List;
/**
* Created by admin on 2019/9/18.
*/
@Resolve("array<string>,string,array<string>->array<string>")
//a 为一年购物集合的short_id b为format_id c为short_id||format_id集合
public class NewPotential extends UDF {
public List<String> evaluate(List<String> yearShortId, String formatId, List<String> shortIdAndFormatId) {
try {
if (null == yearShortId || null == formatId || null == shortIdAndFormatId) {
return null;
} else {
List<String> list = new ArrayList<>();
for (String string : shortIdAndFormatId) {
String[] arr = string.split("\\|\\|");
if (arr.length == 2 && arr[1].equals(formatId)) {
String shortId = arr[0];
if (!yearShortId.contains(shortId) && !shortId.equals("")) {
list.add(shortId);
}
}
}
return list;
}
} catch (Exception e) {
return null;
}
}
}
package miya.udf;
/**
* Created by admin on 2019/7/16.
*/
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;
@Resolve({"string->string"})
public class ReplacErrorStr extends UDF {
public String evaluate(String str) {
try {
return str.replaceAll("[^0-9a-zA-Z\u4e00-\u9fa5]+", "");
} catch (Exception e) {
return str;
}
}
}
package miya.udf;
/**
* Created by admin on 2019/7/16.
*/
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;
import java.util.List;
@Resolve({"string->Double"})
public class StrToDoubleUdf extends UDF {
public Double evaluate(String str) {
try {
return Double.parseDouble(str);
} catch (Exception e) {
return 0d;
}
}
}
package miya.udf;
/**
* Created by admin on 2019/7/16.
*/
import org.json.JSONObject;
import org.json.XML;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;
@Resolve({"string,string->String"})
public class XmlReturnValueUdf extends UDF {
public String evaluate(String xmlOrJson, String key) {
try {
JSONObject json = XML.toJSONObject(xmlOrJson);
JSONObject xml = json.getJSONObject("xml");
String value = xml.get(key).toString();
return value;
} catch (Exception e) {
return "未知";
}
}
}
package miya.udtf;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.annotation.Resolve;
import miya.utils.JsonUtils;
@Resolve({"string->String"})
public class CouponConsumeActivityUdtf extends UDTF {
@Override
public void process(Object[] args) throws UDFException {
try {
String strJsonArr = (String) args[0];
JSONArray couponsArr = JsonUtils.str2JsonArr(strJsonArr);
if (null!=couponsArr && couponsArr.size() >0){
for (int i = 0; i < couponsArr.size(); i++) {
JSONObject couponsObj = couponsArr.getJSONObject(i);
forward(couponsObj.toJSONString());
}
}else{
forward(null);
}
} catch (UDFException e) {
e.printStackTrace();
forward(null);
}
}
}
// StringBuffer stringBuffer = new StringBuffer();
// JSONArray couponsArr = JSON.parseArray(strJsonArr);
// for (int i = 0; i < couponsArr.size(); i++) {
// JSONObject couponsObj = couponsArr.getJSONObject(i);
// JSONArray promGoodsDetailsArr = couponsObj.getJSONArray("promGoodsDetails");
// JSONObject removeObj = couponsObj.fluentRemove("promGoodsDetails");
// for (int j = 0; j < promGoodsDetailsArr.size(); j++) {
// JSONObject promGoodsDetailsObj = promGoodsDetailsArr.getJSONObject(j);
// removeObj.putAll(promGoodsDetailsObj);
// stringBuffer.append(removeObj);
// }
// }
// forward(stringBuffer.toString());
\ No newline at end of file
package miya.udtf;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.annotation.Resolve;
import miya.utils.JsonUtils;
import java.util.ArrayList;
@Resolve({"string->String"})
public class CouponConsumeUdtf extends UDTF {
@Override
public void process(Object[] args) throws UDFException {
try {
String strJsonArr = (String) args[0];
JSONArray couponsArr = JsonUtils.str2JsonArr(strJsonArr);
if (null!=couponsArr && couponsArr.size() >0){
for (int i = 0; i < couponsArr.size(); i++) {
JSONObject couponsObj = couponsArr.getJSONObject(i);
JSONArray promGoodsDetailsArr =JsonUtils.jsonObjectGetArr(couponsObj,"promGoodsDetails");
if (null!=promGoodsDetailsArr && promGoodsDetailsArr.size() >0){
for (int j = 0; j < promGoodsDetailsArr.size(); j++) {
JSONObject promGoodsDetailsObj = promGoodsDetailsArr.getJSONObject(j);
couponsObj.putAll(promGoodsDetailsObj);
forward(couponsObj.toJSONString());
}
}else{
forward(couponsObj.toJSONString());
}
}
}else{
forward(null);
}
} catch (UDFException e) {
e.printStackTrace();
forward(null);
}
}
}
// StringBuffer stringBuffer = new StringBuffer();
// JSONArray couponsArr = JSON.parseArray(strJsonArr);
// for (int i = 0; i < couponsArr.size(); i++) {
// JSONObject couponsObj = couponsArr.getJSONObject(i);
// JSONArray promGoodsDetailsArr = couponsObj.getJSONArray("promGoodsDetails");
// JSONObject removeObj = couponsObj.fluentRemove("promGoodsDetails");
// for (int j = 0; j < promGoodsDetailsArr.size(); j++) {
// JSONObject promGoodsDetailsObj = promGoodsDetailsArr.getJSONObject(j);
// removeObj.putAll(promGoodsDetailsObj);
// stringBuffer.append(removeObj);
// }
// }
// forward(stringBuffer.toString());
\ No newline at end of file
package miya.udtf;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.annotation.Resolve;
import miya.utils.JsonUtils;
@Resolve({"string->String"})
public class JsonArrToObjUdtf extends UDTF {
@Override
public void process(Object[] args) throws UDFException {
try {
String strJsonArr = (String) args[0];
JSONArray jsonArr = JsonUtils.str2JsonArr(strJsonArr);
if (null!=jsonArr && jsonArr.size() >0){
for (int i = 0; i < jsonArr.size(); i++) {
JSONObject jsonObject = jsonArr.getJSONObject(i);
forward(jsonObject.toJSONString());
}
}else{
forward(null);
}
} catch (UDFException e) {
e.printStackTrace();
forward(null);
}
}
}
// StringBuffer stringBuffer = new StringBuffer();
// JSONArray couponsArr = JSON.parseArray(strJsonArr);
// for (int i = 0; i < couponsArr.size(); i++) {
// JSONObject couponsObj = couponsArr.getJSONObject(i);
// JSONArray promGoodsDetailsArr = couponsObj.getJSONArray("promGoodsDetails");
// JSONObject removeObj = couponsObj.fluentRemove("promGoodsDetails");
// for (int j = 0; j < promGoodsDetailsArr.size(); j++) {
// JSONObject promGoodsDetailsObj = promGoodsDetailsArr.getJSONObject(j);
// removeObj.putAll(promGoodsDetailsObj);
// stringBuffer.append(removeObj);
// }
// }
// forward(stringBuffer.toString());
\ No newline at end of file
package miya.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
/**
* Created by admin on 2019/5/15.
*/
public class JsonUtils {
/**
* JSONObject获取指定key
*
* @param json
* @param key
* @return
*/
public static String jsonObjectGetString(JSONObject json, String key) {
if (null==json) {return null;}
try {
//String value = json.getString(key);
String value = json.get(key).toString();
return value;
} catch (Exception e) {
return null;
}
}
/**
* JSONObject获取指定JSONObject
*
* @param json
* @param key
* @return
*/
public static JSONObject jsonObjectGetObj(JSONObject json, String key) {
JSONObject obj=null;
if (null==json) {return obj;}
try {
obj=json.getJSONObject(key);
return obj;
} catch (Exception e) {
return obj;
}
}
/**
* JSONObject获取指定JSONArray
*
* @param json
* @param key
* @return
*/
public static JSONArray jsonObjectGetArr(JSONObject json, String key) {
if (null==json) {return null;}
try {
JSONArray jsonArray = json.getJSONArray(key);
return jsonArray;
} catch (Exception e) {
return null;
}
}
/**
* JSONObject获取指定JSONArray
*
* @param strJsonArr
* @return
*/
public static JSONArray str2JsonArr(String strJsonArr) {
try {
JSONArray jsonArray = JSON.parseArray(strJsonArr);
return jsonArray;
} catch (Exception e) {
return null;
}
}
/**
* JSONObject合并
*
* @param json1
* @param json2
*/
public static void jsonUnion(JSONObject json1, JSONObject json2) {
json1.putAll(json2);
}
}
/**
* Created by admin on 2019/7/16.
*/
import miya.epayment.Tools;
import org.json.JSONObject;
import org.json.XML;
import java.util.concurrent.ConcurrentHashMap;
import static miya.epayment.JsonUtils.jsonObjectGetObj;
import static miya.epayment.JsonUtils.jsonObjectGetString;
public class PayLogUdf2 {
public static String evaluate(String xmlOrJson, String tradeType, String payType, String tradeStep, String position) {
try {
if ("Trade00".equals(tradeStep) && "1.5".equals(tradeType)) {
JSONObject json = XML.toJSONObject(xmlOrJson);
JSONObject resJson = null;
if (!json.has("xml")) {
resJson = json;
} else {
resJson = json.getJSONObject("xml");
}
double goodsAmt = 0;
if (resJson.has("data") && resJson.has("request")) {
JSONObject request = jsonObjectGetObj(resJson, "request");
JSONObject data = jsonObjectGetObj(resJson, "data");
String b5 = jsonObjectGetString(data, "B5"); //商品明细信息
String a4 = jsonObjectGetString(request, "A4"); //门店 pos 机编号
String a5 = jsonObjectGetString(request, "A5"); //门店收银员编号
goodsAmt = Tools.getGoodsAmt(b5);
ConcurrentHashMap<String, String> conMap = new ConcurrentHashMap<>();
conMap.put("8", goodsAmt+"");
conMap.put("10", a4);
conMap.put("11", a5);
return conMap.get(position);
} else {
return null;
}
} else if ("Trade00".equals(tradeStep) && "1.0".equals(tradeType)) {
return dealOldTrade00(xmlOrJson, position);
} else {
if ("1.5".equals(tradeType)) {
if ("A".equals(payType) || "B".equals(payType)) {
return dealTrade03(xmlOrJson,position);
} else if ("C".equals(payType) || "D".equals(payType)) {
return dealTrade03Refund(xmlOrJson,position);
} else if ("E".equals(payType)) {
return dealTrade03Revoke(xmlOrJson,position);
} else if ("F".equals(payType)) {
return dealTrade03Preorder(xmlOrJson,position);
} else if ("G".equals(payType)) {
return dealTrade03G(xmlOrJson,position);
} else {
return dealTrade03(xmlOrJson,position);
}
} else if ("1.0".equals(tradeType)) {
return dealOldTrade03(xmlOrJson,position);
} else {
return null;
}
}
} catch (Exception e) {
return null;
}
}
/**
* 老接口1.0的处理收银员和pos级
*
* @param xmlOrJson
*/
private static String dealOldTrade00(String xmlOrJson, String position) {
JSONObject json = XML.toJSONObject(xmlOrJson);
JSONObject resJson = null;
if (!json.has("xml")) {
resJson = json;
} else {
try {
resJson = json.getJSONObject("xml").getJSONObject("request");
} catch (Exception e) {
resJson = null;
}
}
ConcurrentHashMap<String, String> conMap = new ConcurrentHashMap<>();
conMap.put("10", jsonObjectGetString(resJson, "userid"));
conMap.put("11", jsonObjectGetString(resJson, "cashier"));
return conMap.get(position);
}
//A,B
private static String dealTrade03(String xmlOrJson,String position) {
JSONObject json = XML.toJSONObject(xmlOrJson);
JSONObject resJson = null;
if (!json.has("xml")) {
resJson = json;
} else {
resJson = json.getJSONObject("xml");
}
// ConcurrentHashMap<String, String> conMap = new ConcurrentHashMap<>();
JSONObject valueJSon = new JSONObject();
System.out.println(" kk,kk");
valueJSon.put("0", jsonObjectGetString(resJson, "C2"));
valueJSon.put("1", jsonObjectGetString(resJson, "C3"));
valueJSon.put("2", jsonObjectGetString(resJson, "C4"));
valueJSon.put("3", jsonObjectGetString(resJson, "C5"));
valueJSon.put("4", jsonObjectGetString(resJson, "C6"));
valueJSon.put("5", jsonObjectGetString(resJson, "C7"));
valueJSon.put("6", jsonObjectGetString(resJson, "C10"));
valueJSon.put("9", Tools.getUserid(resJson));
valueJSon.put("13", jsonObjectGetString(resJson, "C9"));
valueJSon.put("14", jsonObjectGetString(resJson, "C11"));
System.out.println(" kk,kk");
return valueJSon.getString(position);
}
//C,D
private static String dealTrade03Refund(String xmlOrJson , String position) {
JSONObject json = XML.toJSONObject(xmlOrJson);
JSONObject resJson = null;
if (!json.has("xml")) {
resJson = json;
} else {
resJson = json.getJSONObject("xml");
}
ConcurrentHashMap<String, String> conMap = new ConcurrentHashMap<>();
conMap.put("0", jsonObjectGetString(resJson, "C2"));
conMap.put("1", jsonObjectGetString(resJson, "C3"));
conMap.put("2", jsonObjectGetString(resJson, "C4"));
conMap.put("3", jsonObjectGetString(resJson, "C5"));
conMap.put("4", jsonObjectGetString(resJson, "C6"));
conMap.put("5", jsonObjectGetString(resJson, "C8"));
conMap.put("6", jsonObjectGetString(resJson, "C16"));
conMap.put("7", jsonObjectGetString(resJson, "C9"));
conMap.put("12", jsonObjectGetString(resJson, "C7"));
return conMap.get(position);
}
//E
private static String dealTrade03Revoke(String xmlOrJson ,String position) {
JSONObject json = XML.toJSONObject(xmlOrJson);
JSONObject resJson = null;
if (!json.has("xml")) {
resJson = json;
} else {
resJson = json.getJSONObject("xml");
}
ConcurrentHashMap<String, String> conMap = new ConcurrentHashMap<>();
conMap.put("0", jsonObjectGetString(resJson, "C2"));
conMap.put("1", jsonObjectGetString(resJson, "C3"));
conMap.put("2", jsonObjectGetString(resJson, "C4"));
conMap.put("3", jsonObjectGetString(resJson, "C5"));
conMap.put("4", jsonObjectGetString(resJson, "C6"));
conMap.put("5", jsonObjectGetString(resJson, "C10"));
return conMap.get(position);
}
//F
private static String dealTrade03Preorder(String xmlOrJson,String position) {
JSONObject json = XML.toJSONObject(xmlOrJson);
JSONObject resJson = null;
if (!json.has("xml")) {
resJson = json;
} else {
resJson = json.getJSONObject("xml");
}
ConcurrentHashMap<String, String> conMap = new ConcurrentHashMap<>();
conMap.put("0", jsonObjectGetString(resJson, "C2"));
conMap.put("1", jsonObjectGetString(resJson, "C3"));
conMap.put("2", jsonObjectGetString(resJson, "C4"));
conMap.put("3", jsonObjectGetString(resJson, "C5"));
conMap.put("5", jsonObjectGetString(resJson, "C7"));
return conMap.get(position);
}
//G
private static String dealTrade03G(String xmlOrJson ,String position) {
JSONObject json = XML.toJSONObject(xmlOrJson);
JSONObject resJson = null;
if (!json.has("xml")) {
resJson = json;
} else {
resJson = json.getJSONObject("xml");
}
ConcurrentHashMap<String, String> conMap = new ConcurrentHashMap<>();
conMap.put("0", jsonObjectGetString(resJson, "C2"));
conMap.put("1", jsonObjectGetString(resJson, "C3"));
conMap.put("2", jsonObjectGetString(resJson, "C4"));
conMap.put("3", jsonObjectGetString(resJson, "C5"));
conMap.put("4", jsonObjectGetString(resJson, "C6"));
conMap.put("5", jsonObjectGetString(resJson, "C7"));
return conMap.get(position);
}
/**
* 老接口1.0的处理逻辑
*
* @param xmlOrJson
*/
private static String dealOldTrade03(String xmlOrJson,String position) {
JSONObject json = XML.toJSONObject(xmlOrJson);
JSONObject resJson = null;
if (!json.has("xml")) {
resJson = json;
} else {
resJson = json.getJSONObject("xml");
}
ConcurrentHashMap<String, String> conMap = new ConcurrentHashMap<>();
conMap.put("0", jsonObjectGetString(resJson, "trad_status"));
conMap.put("1", jsonObjectGetString(resJson, "error_code"));
conMap.put("2", jsonObjectGetString(resJson, "error_msg"));
conMap.put("3", jsonObjectGetString(resJson, "out_trade_no"));
conMap.put("4", jsonObjectGetString(resJson, "trade_no"));
conMap.put("5", jsonObjectGetString(resJson, "total_fee", "oldtotalfee"));
conMap.put("6", jsonObjectGetString(resJson, "fundbilllist"));
conMap.put("7", jsonObjectGetString(resJson, "refund_amount"));
conMap.put("9", jsonObjectGetString(resJson, "buyer_user_id"));
conMap.put("12", jsonObjectGetString(resJson, "out_refund_req_no"));
return conMap.get(position);
}
}
import miya.epayment.GetPromoteInfoUdf;
import miya.epayment.PayLogUdf;
import miya.epayment.Tools;
/**
* Created by admin on 2020/3/26.
*/
public class PayTest {
public static void main(String[] args) {
String evaluate = PayLogUdf2.evaluate(
"<?xml version=\"1.0\" encoding=\"UTF-8\"?><xml> <C2>CANCELSUCCESS</C2> <C8>1480</C8> <C3>CANCELSUCCESS</C3> <C14>[和包]订单4894489412020032412585830撤销成功!</C14> <C10>U</C10> <C7>U</C7> <C4>[和包]订单4894489412020032412585830撤销成功!</C4> <C1>SUCCESS</C1> <C5>4894489412020032412585830</C5> <C11>2020-03-24 12:59:53</C11> <C24>和包</C24> <C30>325CA0EE3C53CBF26383A8B2590F5EB8</C30></xml>"
, "1.5", "E", "Trade03", "5"
);
//
System.out.println(evaluate);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment