Skip to content

Commit

Permalink
[improvement](jdbc catalog) Change JdbcExecutor's error reporting fro…
Browse files Browse the repository at this point in the history
…m UDF to JDBC (apache#35692)

In the initial version, JdbcExecutor directly used UdfRuntimeException,
which could lead to misunderstanding of the exception. Therefore, I
created a separate Exception for JdbcExecutor to help us view the
exception more clearly.
  • Loading branch information
zy-kkk authored Jul 11, 2024
1 parent 3c1488c commit 0c54cc3
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.cloud.security.SecurityChecker;
import org.apache.doris.common.exception.InternalException;
import org.apache.doris.common.exception.UdfRuntimeException;
import org.apache.doris.common.jni.utils.UdfUtils;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ColumnValueConverter;
Expand Down Expand Up @@ -155,31 +154,31 @@ public void cleanDataSource() {
}
}

public void testConnection() throws UdfRuntimeException {
public void testConnection() throws JdbcExecutorException {
try {
resultSet = ((PreparedStatement) stmt).executeQuery();
if (!resultSet.next()) {
throw new UdfRuntimeException(
throw new JdbcExecutorException(
"Failed to test connection in BE: query executed but returned no results.");
}
} catch (SQLException e) {
throw new UdfRuntimeException("Failed to test connection in BE: ", e);
throw new JdbcExecutorException("Failed to test connection in BE: ", e);
}
}

public int read() throws UdfRuntimeException {
public int read() throws JdbcExecutorException {
try {
resultSet = ((PreparedStatement) stmt).executeQuery();
resultSetMetaData = resultSet.getMetaData();
int columnCount = resultSetMetaData.getColumnCount();
block = new ArrayList<>(columnCount);
return columnCount;
} catch (SQLException e) {
throw new UdfRuntimeException("JDBC executor sql has error: ", e);
throw new JdbcExecutorException("JDBC executor sql has error: ", e);
}
}

public long getBlockAddress(int batchSize, Map<String, String> outputParams) throws UdfRuntimeException {
public long getBlockAddress(int batchSize, Map<String, String> outputParams) throws JdbcExecutorException {
try {
if (outputTable != null) {
outputTable.close();
Expand Down Expand Up @@ -221,7 +220,7 @@ public long getBlockAddress(int batchSize, Map<String, String> outputParams) thr
}
} catch (Exception e) {
LOG.warn("jdbc get block address exception: ", e);
throw new UdfRuntimeException("jdbc get block address: ", e);
throw new JdbcExecutorException("jdbc get block address: ", e);
} finally {
block.clear();
}
Expand All @@ -235,63 +234,63 @@ protected void initializeBlock(int columnCount, String[] replaceStringList, int
}
}

public int write(Map<String, String> params) throws UdfRuntimeException {
public int write(Map<String, String> params) throws JdbcExecutorException {
VectorTable batchTable = VectorTable.createReadableTable(params);
// Can't release or close batchTable, it's released by c++
try {
insert(batchTable);
} catch (SQLException e) {
throw new UdfRuntimeException("JDBC executor sql has error: ", e);
throw new JdbcExecutorException("JDBC executor sql has error: ", e);
}
return batchTable.getNumRows();
}

public void openTrans() throws UdfRuntimeException {
public void openTrans() throws JdbcExecutorException {
try {
if (conn != null) {
conn.setAutoCommit(false);
}
} catch (SQLException e) {
throw new UdfRuntimeException("JDBC executor open transaction has error: ", e);
throw new JdbcExecutorException("JDBC executor open transaction has error: ", e);
}
}

public void commitTrans() throws UdfRuntimeException {
public void commitTrans() throws JdbcExecutorException {
try {
if (conn != null) {
conn.commit();
}
} catch (SQLException e) {
throw new UdfRuntimeException("JDBC executor commit transaction has error: ", e);
throw new JdbcExecutorException("JDBC executor commit transaction has error: ", e);
}
}

public void rollbackTrans() throws UdfRuntimeException {
public void rollbackTrans() throws JdbcExecutorException {
try {
if (conn != null) {
conn.rollback();
}
} catch (SQLException e) {
throw new UdfRuntimeException("JDBC executor rollback transaction has error: ", e);
throw new JdbcExecutorException("JDBC executor rollback transaction has error: ", e);
}
}

public int getCurBlockRows() {
return curBlockRows;
}

public boolean hasNext() throws UdfRuntimeException {
public boolean hasNext() throws JdbcExecutorException {
try {
if (resultSet == null) {
return false;
}
return resultSet.next();
} catch (SQLException e) {
throw new UdfRuntimeException("resultSet to get next error: ", e);
throw new JdbcExecutorException("resultSet to get next error: ", e);
}
}

private void init(JdbcDataSourceConfig config, String sql) throws UdfRuntimeException {
private void init(JdbcDataSourceConfig config, String sql) throws JdbcExecutorException {
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
String hikariDataSourceKey = config.createCacheKey();
try {
Expand Down Expand Up @@ -341,13 +340,14 @@ private void init(JdbcDataSourceConfig config, String sql) throws UdfRuntimeExce
initializeStatement(conn, config, sql);

} catch (MalformedURLException e) {
throw new UdfRuntimeException("MalformedURLException to load class about " + config.getJdbcDriverUrl(), e);
throw new JdbcExecutorException("MalformedURLException to load class about "
+ config.getJdbcDriverUrl(), e);
} catch (SQLException e) {
throw new UdfRuntimeException("Initialize datasource failed: ", e);
throw new JdbcExecutorException("Initialize datasource failed: ", e);
} catch (FileNotFoundException e) {
throw new UdfRuntimeException("FileNotFoundException failed: ", e);
throw new JdbcExecutorException("FileNotFoundException failed: ", e);
} catch (Exception e) {
throw new UdfRuntimeException("Initialize datasource failed: ", e);
throw new JdbcExecutorException("Initialize datasource failed: ", e);
} finally {
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,24 @@

package org.apache.doris.jdbc;

import org.apache.doris.common.exception.UdfRuntimeException;

import java.util.Map;

public interface JdbcExecutor {
int read() throws UdfRuntimeException;
int read() throws JdbcExecutorException;

int write(Map<String, String> params) throws UdfRuntimeException;
int write(Map<String, String> params) throws JdbcExecutorException;

long getBlockAddress(int batchSize, Map<String, String> outputParams) throws UdfRuntimeException;
long getBlockAddress(int batchSize, Map<String, String> outputParams) throws JdbcExecutorException;

void close() throws UdfRuntimeException, Exception;
void close() throws JdbcExecutorException, Exception;

void openTrans() throws UdfRuntimeException;
void openTrans() throws JdbcExecutorException;

void commitTrans() throws UdfRuntimeException;
void commitTrans() throws JdbcExecutorException;

void rollbackTrans() throws UdfRuntimeException;
void rollbackTrans() throws JdbcExecutorException;

int getCurBlockRows();

boolean hasNext() throws UdfRuntimeException;
boolean hasNext() throws JdbcExecutorException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.jdbc;

public class JdbcExecutorException extends Exception {
public JdbcExecutorException(String msg, Throwable cause) {
super(msg, cause);
}

public JdbcExecutorException(String msg) {
super(msg);
}
}

0 comments on commit 0c54cc3

Please sign in to comment.