From 3ae98ba9396f86e0033db09d143cf0e1c9b89bc2 Mon Sep 17 00:00:00 2001 From: vikasrathee-cs Date: Wed, 26 Jul 2023 23:25:41 +0530 Subject: [PATCH] PLUGIN-1181 added timeout field. PLUGIN-1181 added timeout field. --- docs/FTPSource-batchsource.md | 8 +++--- .../plugin/batch/source/ftp/FTPConfig.java | 27 ++++++++++++++++--- .../batch/source/ftp/FTPFileSystem.java | 6 +++++ .../batch/source/ftp/SFTPConnectionPool.java | 4 ++- .../batch/source/ftp/SFTPFileSystem.java | 10 ++++--- widgets/FTPSource-batchsource.json | 9 +++++++ 6 files changed, 53 insertions(+), 11 deletions(-) diff --git a/docs/FTPSource-batchsource.md b/docs/FTPSource-batchsource.md index f2e4518..56d59b0 100644 --- a/docs/FTPSource-batchsource.md +++ b/docs/FTPSource-batchsource.md @@ -23,7 +23,7 @@ Properties **Path:** Path to the file or directory to read from. For example: /path/to/directory. -**User:** User name to use for authentication. +**User:** Username to use for authentication. **Password:** Password to use for authentication. @@ -68,6 +68,8 @@ complete path and returns the list of files that match the specified pattern. **Allow Empty Input:** Identify if path needs to be ignored or not, for case when directory or file does not -exists. If set to true it will treat the not present folder as 0 input and log a warning. Default is false. +exist. If set to true it will treat the not present folder as 0 input and log a warning. Default is false. -**File System Properties:** Additional properties to use with the InputFormat when reading the data. \ No newline at end of file +**File System Properties:** Additional properties to use with the InputFormat when reading the data. + +**Connect Timeout:** Maximum time in milliseconds to wait for connection initialization before it times out. diff --git a/src/main/java/io/cdap/plugin/batch/source/ftp/FTPConfig.java b/src/main/java/io/cdap/plugin/batch/source/ftp/FTPConfig.java index 4942bc4..3503e8d 100644 --- a/src/main/java/io/cdap/plugin/batch/source/ftp/FTPConfig.java +++ b/src/main/java/io/cdap/plugin/batch/source/ftp/FTPConfig.java @@ -145,13 +145,18 @@ public class FTPConfig extends PluginConfig implements FileSourceProperties { "value will only be used if the format is 'csv', 'tsv' or 'delimited'. The default value is false.") protected Boolean enableMultilineSupport; + @Macro + @Nullable + @Description("Maximum time in milliseconds to wait for connection initialization before time out.") + private final Integer connectTimeout; + @VisibleForTesting private FTPConfig(@Nullable String referenceName, String type, String host, @Nullable Integer port, String path, String user, String password, @Nullable String fileSystemProperties, @Nullable Boolean ignoreNonExistingFolders, @Nullable String fileRegex, @Nullable Boolean skipHeader, @Nullable String format, @Nullable String schema, @Nullable String delimiter, @Nullable Boolean enableQuotedValues, - @Nullable Boolean enableMultilineSupport) { + @Nullable Boolean enableMultilineSupport, @Nullable Integer connectTimeout) { this.referenceName = referenceName; this.type = type; this.host = host; @@ -168,6 +173,14 @@ private FTPConfig(@Nullable String referenceName, String type, String host, @Nul this.delimiter = delimiter; this.enableQuotedValues = enableQuotedValues; this.enableMultilineSupport = enableMultilineSupport; + this.connectTimeout = connectTimeout; + } + + public int getConnectTimeout() { + if (connectTimeout == null) { + return FTPFileSystem.DEFAULT_CONNECTION_TIMEOUT_MS; + } + return connectTimeout; } @Override @@ -186,10 +199,9 @@ public void validate(FailureCollector collector) { for (Map.Entry entry : location.getHadoopProperties().entrySet()) { conf.set(entry.getKey(), entry.getValue()); } + conf.setInt(FTPFileSystem.FS_CONNECT_TIMEOUT, getConnectTimeout()); try (FileSystem fs = JobUtils.applyWithExtraClassLoader(job, getClass().getClassLoader(), f -> FileSystem.get(location.getURI(), conf))) { - // TODO: Add setTimeout option in the future - // https://cdap.atlassian.net/browse/PLUGIN-1181 fs.getFileStatus(new Path(location.getURI())); } } catch (Exception e) { @@ -371,6 +383,7 @@ static class Builder { private boolean skipHeader; private boolean enableQuotedValues; private boolean enableMultilineSupport; + private Integer connectTimeout; Builder() { this.fileSystemProperties = new HashMap<>(); @@ -379,6 +392,7 @@ static class Builder { this.enableMultilineSupport = false; this.skipHeader = false; this.format = "text"; + this.connectTimeout = FTPFileSystem.DEFAULT_CONNECTION_TIMEOUT_MS; } Builder setType(FTPLocation.Type type) { @@ -462,11 +476,16 @@ Builder setEnableMultilineSupport(boolean enableMultilineSupport) { return this; } + Builder setConnectTimeout(Integer connectTimeout) { + this.connectTimeout = connectTimeout; + return this; + } + FTPConfig build() { return new FTPConfig(referenceName, type.name(), host, port, path, user, password, GSON.toJson(fileSystemProperties), ignoreNonExistingFolders, fileRegex, skipHeader, format, schema == null ? null : schema.toString(), delimiter, enableQuotedValues, - enableMultilineSupport); + enableMultilineSupport, connectTimeout); } } } diff --git a/src/main/java/io/cdap/plugin/batch/source/ftp/FTPFileSystem.java b/src/main/java/io/cdap/plugin/batch/source/ftp/FTPFileSystem.java index b0e9993..7bbcf5d 100644 --- a/src/main/java/io/cdap/plugin/batch/source/ftp/FTPFileSystem.java +++ b/src/main/java/io/cdap/plugin/batch/source/ftp/FTPFileSystem.java @@ -53,6 +53,10 @@ public class FTPFileSystem extends FileSystem { public static final int DEFAULT_BLOCK_SIZE = 4 * 1024; + public static final Integer DEFAULT_CONNECTION_TIMEOUT_MS = 30000; + + public static final String FS_CONNECT_TIMEOUT = "fs.connect.timeout"; + private URI uri; /** @@ -111,7 +115,9 @@ private FTPClient connect() throws IOException { int port = conf.getInt("fs.ftp.host.port", FTP.DEFAULT_PORT); String user = conf.get("fs.ftp.user." + host); String password = conf.get("fs.ftp.password." + host); + int connectTimeout = conf.getInt(FS_CONNECT_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT_MS); client = new FTPClient(); + client.setConnectTimeout(connectTimeout); client.connect(host, port); int reply = client.getReplyCode(); if (!FTPReply.isPositiveCompletion(reply)) { diff --git a/src/main/java/io/cdap/plugin/batch/source/ftp/SFTPConnectionPool.java b/src/main/java/io/cdap/plugin/batch/source/ftp/SFTPConnectionPool.java index 824d7dc..0da4a0e 100644 --- a/src/main/java/io/cdap/plugin/batch/source/ftp/SFTPConnectionPool.java +++ b/src/main/java/io/cdap/plugin/batch/source/ftp/SFTPConnectionPool.java @@ -24,6 +24,7 @@ import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.HashMap; import java.util.HashSet; @@ -118,7 +119,7 @@ public synchronized void setMaxConnection(int maxConn) { } public ChannelSftp connect(String host, int port, String user, - String password, String keyFile) throws IOException { + String password, String keyFile, int timeout) throws IOException { // get connection from pool ConnectionInfo info = new ConnectionInfo(host, port, user); ChannelSftp channel = getFromPool(info); @@ -161,6 +162,7 @@ public ChannelSftp connect(String host, int port, String user, java.util.Properties config = new java.util.Properties(); config.put("StrictHostKeyChecking", "no"); + config.put("ConnectTimeout", String.valueOf(timeout)); session.setConfig(config); session.connect(); diff --git a/src/main/java/io/cdap/plugin/batch/source/ftp/SFTPFileSystem.java b/src/main/java/io/cdap/plugin/batch/source/ftp/SFTPFileSystem.java index b98369f..71c28e9 100644 --- a/src/main/java/io/cdap/plugin/batch/source/ftp/SFTPFileSystem.java +++ b/src/main/java/io/cdap/plugin/batch/source/ftp/SFTPFileSystem.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.Progressable; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -83,11 +84,13 @@ public class SFTPFileSystem extends FileSystem { "Destination path %s already exist, cannot rename!"; public static final String E_FAILED_GETHOME = "Failed to get home directory"; public static final String E_FAILED_DISCONNECT = "Failed to disconnect"; + public static final Integer DEFAULT_CONNECTION_TIMEOUT_MS = 30000; + public static final String FS_CONNECT_TIMEOUT = "fs.connect.timeout"; /** * Set configuration from UI. * - * @param uri + * @param uriInfo * @param conf * @throws IOException */ @@ -145,9 +148,10 @@ private ChannelSftp connect() throws IOException { String user = conf.get(FS_SFTP_USER_PREFIX + host, null); String pwd = conf.get(FS_SFTP_PASSWORD_PREFIX + host + "." + user, null); String keyFile = conf.get(FS_SFTP_KEYFILE, null); + int connectTimeout = conf.getInt(FS_CONNECT_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT_MS); ChannelSftp channel = - connectionPool.connect(host, port, user, pwd, keyFile); + connectionPool.connect(host, port, user, pwd, keyFile, connectTimeout); return channel; } @@ -155,7 +159,7 @@ private ChannelSftp connect() throws IOException { /** * Logout and disconnect the given channel. * - * @param client + * @param channel * @throws IOException */ private void disconnect(ChannelSftp channel) throws IOException { diff --git a/widgets/FTPSource-batchsource.json b/widgets/FTPSource-batchsource.json index 36cd067..a89ee3d 100644 --- a/widgets/FTPSource-batchsource.json +++ b/widgets/FTPSource-batchsource.json @@ -185,6 +185,15 @@ } ] } + }, + { + "widget-type": "number", + "label": "Connect Timeout (milliseconds)", + "name": "connectTimeout", + "widget-attributes": { + "min": "0", + "default": "30000" + } } ] }