Skip to content

Commit

Permalink
Merge the dev branch.
Browse files Browse the repository at this point in the history
  • Loading branch information
shaomengwang committed Mar 14, 2023
1 parent 7a5dca7 commit deea80f
Show file tree
Hide file tree
Showing 1,693 changed files with 59,493 additions and 7,312 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.types.Row;

import com.alibaba.alink.common.AlinkTypes;
import com.alibaba.alink.common.type.AlinkTypes;
import com.alibaba.alink.common.io.catalog.datahub.datastream.util.DatahubClientProvider;
import com.aliyun.datahub.client.model.Field;
import com.aliyun.datahub.client.model.FieldType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.Ignore;
import org.junit.rules.TemporaryFolder;

import java.math.BigDecimal;
Expand All @@ -44,6 +45,7 @@
import java.util.Arrays;
import java.util.Collections;

@Ignore
public class SqliteCatalogTest {

@ClassRule
Expand Down Expand Up @@ -481,4 +483,4 @@ public void sinkBatch() throws Exception {
).collect().isEmpty()
);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.alibaba.alink.common.io.kafka.plugin;
package com.alibaba.alink.common.io.kafka;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.alibaba.alink.common.io.kafka.plugin;
package com.alibaba.alink.common.io.kafka;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.alibaba.alink.common.io.kafka.plugin;
package com.alibaba.alink.common.io.kafka;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.alibaba.alink.common.io.kafka.plugin;
package com.alibaba.alink.common.io.kafka;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.ml.api.misc.param.Params;
Expand All @@ -9,7 +9,8 @@
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import com.alibaba.alink.common.io.kafka.plugin.KafkaSourceBuilder.StartupMode;
import com.alibaba.alink.common.io.kafka.KafkaSourceBuilder.StartupMode;
import com.alibaba.alink.operator.stream.sink.KafkaSourceSinkFactory;
import com.alibaba.alink.params.io.KafkaSinkParams;
import com.alibaba.alink.params.io.KafkaSourceParams;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.alibaba.alink.common.io.kafka.plugin;
package com.alibaba.alink.common.io.kafka;

import java.text.SimpleDateFormat;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.alibaba.alink.common;

import org.apache.flink.runtime.util.EnvironmentInformation;

import com.alibaba.alink.common.io.plugin.PluginConfig;
import com.alibaba.alink.common.io.plugin.PluginDownloader;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import com.alibaba.alink.operator.local.sql.CalciteFunctionCompiler;

public class LocalMLEnvironment {
private static final LocalMLEnvironment INSTANCE = new LocalMLEnvironment();

private final LocalOpCalciteSqlExecutor sqlExecutor;
private static final ThreadLocal<LocalMLEnvironment> threadLocalEnv = ThreadLocal.withInitial(() -> new LocalMLEnvironment());
/**
* lazy load for speed.
*/
private LocalOpCalciteSqlExecutor sqlExecutor;

// Compile user defined functions. We need to use its latest classloader when executing SQL.
private final CalciteFunctionCompiler calciteFunctionCompiler;
Expand All @@ -16,19 +18,21 @@ public class LocalMLEnvironment {

private LocalMLEnvironment() {
calciteFunctionCompiler = new CalciteFunctionCompiler(Thread.currentThread().getContextClassLoader());
sqlExecutor = new LocalOpCalciteSqlExecutor(this);
lazyObjectsManager = new LocalLazyObjectsManager();
}

public static LocalMLEnvironment getInstance() {
return INSTANCE;
return threadLocalEnv.get();
}

public CalciteFunctionCompiler getCalciteFunctionCompiler() {
return calciteFunctionCompiler;
}

public LocalOpCalciteSqlExecutor getSqlExecutor() {
public synchronized LocalOpCalciteSqlExecutor getSqlExecutor() {
if (sqlExecutor == null) {
sqlExecutor = new LocalOpCalciteSqlExecutor(this);
}
return sqlExecutor;
}

Expand Down
42 changes: 24 additions & 18 deletions core/src/main/java/com/alibaba/alink/common/MLEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import com.alibaba.alink.common.MTable.MTableKryoSerializer;
import com.alibaba.alink.common.MTable.MTableKryoSerializerV2;
import com.alibaba.alink.common.exceptions.AkIllegalArgumentException;
import com.alibaba.alink.common.lazy.LazyObjectsManager;
import com.alibaba.alink.common.linalg.tensor.Tensor;
import com.alibaba.alink.common.linalg.tensor.TensorKryoSerializer;
import com.alibaba.alink.common.sql.builtin.BuildInAggRegister;
import com.alibaba.alink.common.utils.DataSetConversionUtil;
import com.alibaba.alink.common.utils.DataStreamConversionUtil;
import com.alibaba.alink.common.sql.builtin.BuiltInAggRegister;
import com.alibaba.alink.operator.batch.utils.DataSetConversionUtil;
import com.alibaba.alink.operator.stream.utils.DataStreamConversionUtil;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.source.TableSourceBatchOp;
import com.alibaba.alink.operator.stream.StreamOperator;
Expand Down Expand Up @@ -104,20 +105,22 @@ public MLEnvironment(
this.streamEnv = streamEnv;
this.streamTableEnv = streamTableEnv;
if (this.env != null) {
env.addDefaultKryoSerializer(MTable.class, new MTableKryoSerializer());
env.addDefaultKryoSerializer(MTable.class, new MTableKryoSerializerV2());
env.addDefaultKryoSerializer(Tensor.class, new TensorKryoSerializer());
}
if (this.streamEnv != null) {
streamEnv.addDefaultKryoSerializer(MTable.class, new MTableKryoSerializer());
streamEnv.addDefaultKryoSerializer(MTable.class, new MTableKryoSerializerV2());
streamEnv.addDefaultKryoSerializer(Tensor.class, new TensorKryoSerializer());
}
if (this.batchTableEnv != null) {
BuildInAggRegister.registerUdf(this.batchTableEnv);
BuildInAggRegister.registerUdaf(this.batchTableEnv);
BuiltInAggRegister.registerUdf(this.batchTableEnv);
BuiltInAggRegister.registerUdtf(this.batchTableEnv);
BuiltInAggRegister.registerUdaf(this.batchTableEnv);
}
if (this.streamTableEnv != null) {
BuildInAggRegister.registerUdf(this.streamTableEnv);
BuildInAggRegister.registerUdaf(this.streamTableEnv);
BuiltInAggRegister.registerUdf(this.streamTableEnv);
BuiltInAggRegister.registerUdtf(this.streamTableEnv);
BuiltInAggRegister.registerUdaf(this.streamTableEnv);
}
}

Expand Down Expand Up @@ -150,7 +153,7 @@ public ExecutionEnvironment getExecutionEnvironment() {
env = ExecutionEnvironment.getExecutionEnvironment();
}

env.addDefaultKryoSerializer(MTable.class, new MTableKryoSerializer());
env.addDefaultKryoSerializer(MTable.class, new MTableKryoSerializerV2());
env.addDefaultKryoSerializer(Tensor.class, new TensorKryoSerializer());
}
return env;
Expand All @@ -166,7 +169,7 @@ public ExecutionEnvironment getExecutionEnvironment() {
public StreamExecutionEnvironment getStreamExecutionEnvironment() {
if (null == streamEnv) {
streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.addDefaultKryoSerializer(MTable.class, new MTableKryoSerializer());
streamEnv.addDefaultKryoSerializer(MTable.class, new MTableKryoSerializerV2());
streamEnv.addDefaultKryoSerializer(Tensor.class, new TensorKryoSerializer());
}
return streamEnv;
Expand All @@ -182,8 +185,9 @@ public StreamExecutionEnvironment getStreamExecutionEnvironment() {
public BatchTableEnvironment getBatchTableEnvironment() {
if (null == batchTableEnv) {
batchTableEnv = BatchTableEnvironment.create(getExecutionEnvironment());
BuildInAggRegister.registerUdf(this.batchTableEnv);
BuildInAggRegister.registerUdaf(this.batchTableEnv);
BuiltInAggRegister.registerUdf(batchTableEnv);
BuiltInAggRegister.registerUdtf(batchTableEnv);
BuiltInAggRegister.registerUdaf(batchTableEnv);
}
return batchTableEnv;
}
Expand All @@ -205,8 +209,10 @@ public StreamTableEnvironment getStreamTableEnvironment() {
.useOldPlanner()
.build()
);
BuildInAggRegister.registerUdf(this.streamTableEnv);
BuildInAggRegister.registerUdaf(this.streamTableEnv);

BuiltInAggRegister.registerUdf(streamTableEnv);
BuiltInAggRegister.registerUdtf(streamTableEnv);
BuiltInAggRegister.registerUdaf(streamTableEnv);
}
return streamTableEnv;
}
Expand Down Expand Up @@ -257,7 +263,7 @@ public Table createBatchTable(Row[] rows, String[] colNames) {
*/
public Table createBatchTable(List <Row> rows, String[] colNames) {
if (rows == null || rows.size() < 1) {
throw new IllegalArgumentException("Values can not be empty.");
throw new AkIllegalArgumentException("Values can not be empty.");
}

Row first = rows.iterator().next();
Expand Down Expand Up @@ -297,7 +303,7 @@ public Table createStreamTable(Row[] rows, String[] colNames) {
*/
public Table createStreamTable(List <Row> rows, String[] colNames) {
if (rows == null || rows.size() < 1) {
throw new IllegalArgumentException("Values can not be empty.");
throw new AkIllegalArgumentException("Values can not be empty.");
}

Row first = rows.iterator().next();
Expand Down
61 changes: 14 additions & 47 deletions core/src/main/java/com/alibaba/alink/common/MTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import com.alibaba.alink.common.io.filesystem.binary.RowStreamSerializerV2;
import com.alibaba.alink.common.linalg.VectorUtil;
import com.alibaba.alink.common.linalg.tensor.TensorUtil;
import com.alibaba.alink.common.type.AlinkTypes;
import com.alibaba.alink.common.utils.JsonConverter;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.common.viz.DataTypeDisplayInterface;
import com.alibaba.alink.operator.common.io.csv.CsvFormatter;
import com.alibaba.alink.operator.common.io.csv.CsvParser;
import com.alibaba.alink.operator.common.statistics.basicstatistic.TableSummarizer;
Expand Down Expand Up @@ -197,60 +199,25 @@ public MTable select(int... colIndexes) {
return MTableUtil.select(this, colIndexes);
}

/**
* summary for MTable.
*/
public TableSummary summary(String... selectedColNames) {
TableSchema schema = getSchema();
TableSummarizer srt = new TableSummarizer(
schema.getFieldNames(),
TableUtil.findColIndicesWithAssertAndHint(schema, getCalcCols(schema, selectedColNames)), true);
for (Row row : this.rows) {
srt.visit(row);
}
return srt.toSummary(selectedColNames);
return subSummary(selectedColNames, 0, this.getNumRow());
}

//summary for data from fromId line to endId line, include fromId and exclude endId.
public TableSummary subSummary(String[] selectedColNames, int fromId, int endId) {
TableSchema schema = getSchema();
TableSummarizer srt = new TableSummarizer(
schema.getFieldNames(),
TableUtil.findColIndicesWithAssertAndHint(schema, getCalcCols(schema, selectedColNames)), true);
for (int i = Math.max(fromId, 0); i < Math.min(endId, this.getNumRow()); i++) {
srt.visit(this.rows.get(i));
if (null == selectedColNames || 0 == selectedColNames.length) {
selectedColNames = this.getColNames();
}
return srt.toSummary(selectedColNames);
}

/**
* exclude columns that are not supported types and not in selected columns
*/
private static String[] getCalcCols(TableSchema tableSchema, String[] selectedColNames) {
ArrayList <String> calcCols = new ArrayList <>();
String[] inColNames = selectedColNames.length == 0 ? tableSchema.getFieldNames() : selectedColNames;
int[] colIndices = TableUtil.findColIndices(tableSchema, inColNames);
TypeInformation <?>[] inColTypes = tableSchema.getFieldTypes();

for (int i = 0; i < inColNames.length; i++) {
if (isSupportedType(inColTypes[colIndices[i]])) {
calcCols.add(inColNames[i]);
}
TableSchema schema = new TableSchema(selectedColNames, TableUtil.findColTypes(getSchema(), selectedColNames));
int[] selectColIndices = TableUtil.findColIndices(getSchema(), selectedColNames);
TableSummarizer srt = new TableSummarizer(schema, false);
for (int i = Math.max(fromId, 0); i < Math.min(endId, this.getNumRow()); i++) {
srt.visit(Row.project(this.rows.get(i), selectColIndices));
}

return calcCols.toArray(new String[0]);
}

private static boolean isSupportedType(TypeInformation <?> dataType) {
return Types.DOUBLE.equals(dataType)
|| Types.LONG.equals(dataType)
|| Types.BYTE.equals(dataType)
|| Types.INT.equals(dataType)
|| Types.FLOAT.equals(dataType)
|| Types.SHORT.equals(dataType)
|| Types.BIG_DEC.equals(dataType)
|| Types.BOOLEAN.equals(dataType);
}

public void printSummary(String... selectedColNames) {
System.out.println(summary(selectedColNames).toString());
return srt.toSummary();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
package com.alibaba.alink.common.annotation;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.ml.api.misc.param.WithParams;

import org.reflections.Reflections;

import java.lang.annotation.Annotation;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.stream.Collectors;

public class ParamAnnotationUtils {
final static String BASE_PARAMS_PKG_NAME = "com.alibaba.alink.params";
final static Class <?>[] PARAM_BASES = new Class[] {WithParams.class};

static List <Class <?>> getAllInterfaces(Class <?> clz) {
Set <Class <?>> visited = new HashSet <>();
Expand Down Expand Up @@ -118,4 +125,17 @@ public static HashSet <TypeInformation <?>> getAllowedTypes(ParamSelectColumnSpe
}
return s;
}

public static List <Class <?>> listParamInfos(Class <?>... bases) {
Reflections ref = new Reflections(BASE_PARAMS_PKG_NAME);
List <Class <?>> params = new ArrayList <>();
for (Class <?> base : bases) {
params.addAll(ref.getSubTypesOf(base));
}
return params.stream()
.filter(PublicOperatorUtils::isPublicUsable)
.sorted(Comparator.comparing(Class::toString))
.collect(Collectors.toList());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,14 @@ public enum PortDesc implements Internationalizable {
ASSOCIATION_PATTERNS,
ASSOCIATION_RULES,
GRPAH_EDGES,
GRAPH_VERTICES;
GRAPH_VERTICES,
INPUT_DICT_DATA,
INPUT_QUERY_DATA,
FEATURE_FREQUENCY,

SIMILAR_ITEM_PAIRS,
FEATURE_HASH_RESULTS,
MIN_HASH_RESULTS;

public static final ResourceBundle PORT_DESC_CN_BUNDLE = ResourceBundle.getBundle(
"i18n/port_desc", new Locale("zh", "CN"), new UTF8Control());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.alibaba.alink.common.annotation;

import com.alibaba.alink.common.lazy.ExtractModelInfoBatchOp;
import com.alibaba.alink.operator.batch.utils.ExtractModelInfoBatchOp;
import com.alibaba.alink.operator.AlgoOperator;
import com.alibaba.alink.pipeline.Pipeline;
import com.alibaba.alink.pipeline.PipelineModel;
Expand Down
Loading

0 comments on commit deea80f

Please sign in to comment.