Skip to content

Commit

Permalink
Merge pull request #45 from data-integrations/username-colon
Browse files Browse the repository at this point in the history
PLUGIN-1181 added timeout field.
  • Loading branch information
albertshau authored Jul 27, 2023
2 parents 1f8c89d + 3ae98ba commit e4a511c
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 11 deletions.
8 changes: 5 additions & 3 deletions docs/FTPSource-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Properties

**Path:** Path to the file or directory to read from. For example: /path/to/directory.

**User:** User name to use for authentication.
**User:** Username to use for authentication.

**Password:** Password to use for authentication.

Expand Down Expand Up @@ -68,6 +68,8 @@ complete
path and returns the list of files that match the specified pattern.

**Allow Empty Input:** Identify if path needs to be ignored or not, for case when directory or file does not
exists. If set to true it will treat the not present folder as 0 input and log a warning. Default is false.
exist. If set to true it will treat the not present folder as 0 input and log a warning. Default is false.

**File System Properties:** Additional properties to use with the InputFormat when reading the data.
**File System Properties:** Additional properties to use with the InputFormat when reading the data.

**Connect Timeout:** Maximum time in milliseconds to wait for connection initialization before it times out.
27 changes: 23 additions & 4 deletions src/main/java/io/cdap/plugin/batch/source/ftp/FTPConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,18 @@ public class FTPConfig extends PluginConfig implements FileSourceProperties {
"value will only be used if the format is 'csv', 'tsv' or 'delimited'. The default value is false.")
protected Boolean enableMultilineSupport;

@Macro
@Nullable
@Description("Maximum time in milliseconds to wait for connection initialization before time out.")
private final Integer connectTimeout;

@VisibleForTesting
private FTPConfig(@Nullable String referenceName, String type, String host, @Nullable Integer port, String path,
String user, String password, @Nullable String fileSystemProperties,
@Nullable Boolean ignoreNonExistingFolders, @Nullable String fileRegex,
@Nullable Boolean skipHeader, @Nullable String format, @Nullable String schema,
@Nullable String delimiter, @Nullable Boolean enableQuotedValues,
@Nullable Boolean enableMultilineSupport) {
@Nullable Boolean enableMultilineSupport, @Nullable Integer connectTimeout) {
this.referenceName = referenceName;
this.type = type;
this.host = host;
Expand All @@ -168,6 +173,14 @@ private FTPConfig(@Nullable String referenceName, String type, String host, @Nul
this.delimiter = delimiter;
this.enableQuotedValues = enableQuotedValues;
this.enableMultilineSupport = enableMultilineSupport;
this.connectTimeout = connectTimeout;
}

public int getConnectTimeout() {
if (connectTimeout == null) {
return FTPFileSystem.DEFAULT_CONNECTION_TIMEOUT_MS;
}
return connectTimeout;
}

@Override
Expand All @@ -186,10 +199,9 @@ public void validate(FailureCollector collector) {
for (Map.Entry<String, String> entry : location.getHadoopProperties().entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
conf.setInt(FTPFileSystem.FS_CONNECT_TIMEOUT, getConnectTimeout());
try (FileSystem fs = JobUtils.applyWithExtraClassLoader(job, getClass().getClassLoader(),
f -> FileSystem.get(location.getURI(), conf))) {
// TODO: Add setTimeout option in the future
// https://cdap.atlassian.net/browse/PLUGIN-1181
fs.getFileStatus(new Path(location.getURI()));
}
} catch (Exception e) {
Expand Down Expand Up @@ -371,6 +383,7 @@ static class Builder {
private boolean skipHeader;
private boolean enableQuotedValues;
private boolean enableMultilineSupport;
private Integer connectTimeout;

Builder() {
this.fileSystemProperties = new HashMap<>();
Expand All @@ -379,6 +392,7 @@ static class Builder {
this.enableMultilineSupport = false;
this.skipHeader = false;
this.format = "text";
this.connectTimeout = FTPFileSystem.DEFAULT_CONNECTION_TIMEOUT_MS;
}

Builder setType(FTPLocation.Type type) {
Expand Down Expand Up @@ -462,11 +476,16 @@ Builder setEnableMultilineSupport(boolean enableMultilineSupport) {
return this;
}

Builder setConnectTimeout(Integer connectTimeout) {
this.connectTimeout = connectTimeout;
return this;
}

FTPConfig build() {
return new FTPConfig(referenceName, type.name(), host, port, path, user, password,
GSON.toJson(fileSystemProperties), ignoreNonExistingFolders, fileRegex, skipHeader,
format, schema == null ? null : schema.toString(), delimiter, enableQuotedValues,
enableMultilineSupport);
enableMultilineSupport, connectTimeout);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public class FTPFileSystem extends FileSystem {

public static final int DEFAULT_BLOCK_SIZE = 4 * 1024;

public static final Integer DEFAULT_CONNECTION_TIMEOUT_MS = 30000;

public static final String FS_CONNECT_TIMEOUT = "fs.connect.timeout";

private URI uri;

/**
Expand Down Expand Up @@ -111,7 +115,9 @@ private FTPClient connect() throws IOException {
int port = conf.getInt("fs.ftp.host.port", FTP.DEFAULT_PORT);
String user = conf.get("fs.ftp.user." + host);
String password = conf.get("fs.ftp.password." + host);
int connectTimeout = conf.getInt(FS_CONNECT_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT_MS);
client = new FTPClient();
client.setConnectTimeout(connectTimeout);
client.connect(host, port);
int reply = client.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -118,7 +119,7 @@ public synchronized void setMaxConnection(int maxConn) {
}

public ChannelSftp connect(String host, int port, String user,
String password, String keyFile) throws IOException {
String password, String keyFile, int timeout) throws IOException {
// get connection from pool
ConnectionInfo info = new ConnectionInfo(host, port, user);
ChannelSftp channel = getFromPool(info);
Expand Down Expand Up @@ -161,6 +162,7 @@ public ChannelSftp connect(String host, int port, String user,

java.util.Properties config = new java.util.Properties();
config.put("StrictHostKeyChecking", "no");
config.put("ConnectTimeout", String.valueOf(timeout));
session.setConfig(config);

session.connect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -83,11 +84,13 @@ public class SFTPFileSystem extends FileSystem {
"Destination path %s already exist, cannot rename!";
public static final String E_FAILED_GETHOME = "Failed to get home directory";
public static final String E_FAILED_DISCONNECT = "Failed to disconnect";
public static final Integer DEFAULT_CONNECTION_TIMEOUT_MS = 30000;
public static final String FS_CONNECT_TIMEOUT = "fs.connect.timeout";

/**
* Set configuration from UI.
*
* @param uri
* @param uriInfo
* @param conf
* @throws IOException
*/
Expand Down Expand Up @@ -145,17 +148,18 @@ private ChannelSftp connect() throws IOException {
String user = conf.get(FS_SFTP_USER_PREFIX + host, null);
String pwd = conf.get(FS_SFTP_PASSWORD_PREFIX + host + "." + user, null);
String keyFile = conf.get(FS_SFTP_KEYFILE, null);
int connectTimeout = conf.getInt(FS_CONNECT_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT_MS);

ChannelSftp channel =
connectionPool.connect(host, port, user, pwd, keyFile);
connectionPool.connect(host, port, user, pwd, keyFile, connectTimeout);

return channel;
}

/**
* Logout and disconnect the given channel.
*
* @param client
* @param channel
* @throws IOException
*/
private void disconnect(ChannelSftp channel) throws IOException {
Expand Down
9 changes: 9 additions & 0 deletions widgets/FTPSource-batchsource.json
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,15 @@
}
]
}
},
{
"widget-type": "number",
"label": "Connect Timeout (milliseconds)",
"name": "connectTimeout",
"widget-attributes": {
"min": "0",
"default": "30000"
}
}
]
}
Expand Down

0 comments on commit e4a511c

Please sign in to comment.