Skip to content

Commit

Permalink
[Linkis-datasource] Adjust the architecture of metadata service and a…
Browse files Browse the repository at this point in the history
…dd support of HDFS type in datasource (#3613)
  • Loading branch information
Davidhua1996 authored Oct 20, 2022
1 parent 69ff2d8 commit a4edf1b
Show file tree
Hide file tree
Showing 30 changed files with 1,103 additions and 180 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.datasourcemanager.common;

import org.apache.linkis.common.conf.CommonVars;
import org.apache.linkis.datasourcemanager.common.domain.DataSource;
import org.apache.linkis.datasourcemanager.common.domain.DataSourceType;

import java.util.HashMap;
import java.util.Map;

public class DataSources {

/** Default HDFS name */
private static final CommonVars<String> DEFAULT_HDFS_NAME =
CommonVars.apply("wds.linkis.server.dsm.default.hdfs.name", ".LOCAL_HDFS");

private static final Map<String, DataSource> DEFAULT_DATASOURCES = new HashMap<>();

static {
DataSourceType hdfsType = new DataSourceType();
hdfsType.setName("hdfs");
DataSource hdfs = new DataSource();
hdfs.setDataSourceType(hdfsType);
hdfs.setDataSourceName(DEFAULT_HDFS_NAME.getValue());
DEFAULT_DATASOURCES.put(hdfs.getDataSourceName(), hdfs);
DEFAULT_DATASOURCES
.values()
.forEach(dataSource -> dataSource.setCreateUser(System.getProperty("user.name")));
}

/**
* Find the default data source by name
*
* @param dataSourceName data source name
* @return data source
*/
public static DataSource getDefault(String dataSourceName) {
return DEFAULT_DATASOURCES.get(dataSourceName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@

package org.apache.linkis.metadata.query.common.service;

import org.apache.linkis.common.exception.WarnException;
import org.apache.linkis.datasourcemanager.common.util.json.Json;
import org.apache.linkis.metadata.query.common.cache.CacheConfiguration;
import org.apache.linkis.metadata.query.common.cache.CacheManager;
import org.apache.linkis.metadata.query.common.cache.ConnCacheManager;
import org.apache.linkis.metadata.query.common.domain.MetaColumnInfo;
import org.apache.linkis.metadata.query.common.domain.MetaPartitionInfo;
import org.apache.linkis.metadata.query.common.exception.MetaRuntimeException;

import org.apache.commons.lang3.StringUtils;
Expand All @@ -34,7 +31,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.List;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
Expand All @@ -45,8 +42,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractMetaService<C extends Closeable> implements MetadataService {
private static final Logger LOG = LoggerFactory.getLogger(AbstractMetaService.class);
/**
* Meta service use cache manager
*
* @param <C>
*/
public abstract class AbstractCacheMetaService<C extends Closeable> implements BaseMetadataService {

private static final Logger LOG = LoggerFactory.getLogger(AbstractCacheMetaService.class);

private static final String CONN_CACHE_REQ = "_STORED";

private CacheManager connCacheManager;
Expand All @@ -62,140 +66,40 @@ public void init() {
/**
* If want to use cache component, you should invoke this in constructor method
*
* @param cacheManager
* @param cacheManager cache manage
*/
protected void initCache(CacheManager cacheManager) {
String prefix = this.getClass().getSimpleName();
reqCache =
cacheManager.buildCache(
prefix + CONN_CACHE_REQ,
notification -> {
assert notification.getValue() != null;
close(notification.getValue().getConnection());
});
// Clean up the req cache
reqCache.cleanUp();
}

@Override
public abstract MetadataConnection<C> getConnection(String operator, Map<String, Object> params)
throws Exception;

@Override
public List<String> getDatabases(String operator, Map<String, Object> params) {
return this.getConnAndRun(operator, params, this::queryDatabases);
}

@Override
public List<String> getTables(String operator, Map<String, Object> params, String database) {
return this.getConnAndRun(operator, params, conn -> this.queryTables(conn, database));
}

@Override
public Map<String, String> getTableProps(
String operator, Map<String, Object> params, String database, String table) {
return this.getConnAndRun(
operator, params, conn -> this.queryTableProps(conn, database, table));
}

@Override
public MetaPartitionInfo getPartitions(
String operator,
Map<String, Object> params,
String database,
String table,
boolean traverse) {
return this.getConnAndRun(
operator, params, conn -> this.queryPartitions(conn, database, table, traverse));
}

@Override
public List<MetaColumnInfo> getColumns(
String operator, Map<String, Object> params, String database, String table) {
return this.getConnAndRun(operator, params, conn -> this.queryColumns(conn, database, table));
}

@Override
public Map<String, String> getPartitionProps(
String operator,
Map<String, Object> params,
String database,
String table,
String partition) {
return this.getConnAndRun(
operator, params, conn -> this.queryPartitionProps(conn, database, table, partition));
}

/**
* Get database list by connection
*
* @param connection metadata connection
* @return
*/
public List<String> queryDatabases(C connection) {
throw new WarnException(-1, "This method is no supported");
}

/**
* Get table list by connection and database
*
* @param connection metadata connection
* @param database database
* @return
*/
public List<String> queryTables(C connection, String database) {
throw new WarnException(-1, "This method is no supported");
}

/**
* Get partitions by connection, database and table
*
* @param connection metadata connection
* @param database database
* @param table table
* @return
*/
public MetaPartitionInfo queryPartitions(
C connection, String database, String table, boolean traverse) {
throw new WarnException(-1, "This method is no supported");
if (useCache()) {
String prefix = this.getClass().getSimpleName();
reqCache =
cacheManager.buildCache(
prefix + CONN_CACHE_REQ,
notification -> {
assert notification.getValue() != null;
close(notification.getValue().getConnection());
});
// Clean up the req cache
reqCache.cleanUp();
}
}

/**
* Get columns by connection, database and table
* If use the cache
*
* @param connection metadata connection
* @param database database
* @param table table
* @return
* @return boolean
*/
public List<MetaColumnInfo> queryColumns(C connection, String database, String table) {
throw new WarnException(-1, "This method is no supported");
protected boolean useCache() {
return true;
}

/**
* Get the properties of partition
*
* @param connection
* @param database
* @param table
* @param partition
* @return
*/
public Map<String, String> queryPartitionProps(
C connection, String database, String table, String partition) {
throw new WarnException(-1, "This method is no supported");
}
@Override
public abstract MetadataConnection<C> getConnection(String operator, Map<String, Object> params)
throws Exception;

/**
* Get table properties
*
* @param connection metadata connection
* @param database database
* @param table table
* @return
*/
public Map<String, String> queryTableProps(C connection, String database, String table) {
throw new WarnException(-1, "This method is no supported");
public Map<String, String> getConnectionInfo(
String operator, Map<String, Object> params, Map<String, String> queryParams) {
return this.getConnAndRun(
operator, params, connection -> this.queryConnectionInfo(connection, queryParams));
}

public void close(C connection) {
Expand All @@ -207,12 +111,25 @@ public void close(C connection) {
}
}

/**
* Get connection information
*
* @param connection connection
* @param queryParams query params
* @return map
*/
public Map<String, String> queryConnectionInfo(C connection, Map<String, String> queryParams) {
return Collections.emptyMap();
}

protected <R> R getConnAndRun(
String operator, Map<String, Object> params, Function<C, R> action) {
String cacheKey = "";
MetadataConnection<C> connection = null;
try {
cacheKey = md5String(Json.toJson(params, null), "", 2);
// Dive the cache by operator/creator
cacheKey = operator + "_" + md5String(Json.toJson(params, null), "", 2);
if (null != reqCache) {
ConnectionCache<C> connectionCache =
getConnectionInCache(reqCache, cacheKey, () -> getConnection(operator, params));
Expand Down
Loading

0 comments on commit a4edf1b

Please sign in to comment.