Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Packet corrupt #506

Open
huang-xuan opened this issue Feb 21, 2024 · 6 comments
Open

Packet corrupt #506

huang-xuan opened this issue Feb 21, 2024 · 6 comments

Comments

@huang-xuan
Copy link

Use commons-pool2 to customize the thread pool. It is OK to upload files for the first time. After a while, the error Packet corrupt is reported. Is it related to seesion reuse?

@huang-xuan
Copy link
Author

@configuration
public class CvlSftpConfig {
@Autowired
private SftpProperties properties;

@Bean(destroyMethod = "close")
public SftpClient sftpClient() throws Exception {
    return new SftpClient(properties);
}

}

@slf4j
public class JschUtil {
/**
* create session
*/
public static Session createSession(String userName, String password, String host, int port, String privateKeyFile, String passphrase) throws CustomException {
return createSession(new JSch(), userName, password, host, port, privateKeyFile, passphrase);
}

/**
 * create session
 */
public static Session createSession(JSch jSch, String userName, String password, String host, int port, String privateKeyFile, String passphrase) throws CustomException {
    try {
        if (!StringUtils.isEmpty(privateKeyFile)) {
            if (!StringUtils.isEmpty(passphrase)) {
                jSch.addIdentity(privateKeyFile, passphrase);
            } else {
                jSch.addIdentity(privateKeyFile);
            }
        }
        Session session = jSch.getSession(userName, host, port);
        if (!StringUtils.isEmpty(password)) {
            session.setPassword(password);
        }
        session.setConfig("StrictHostKeyChecking", "no");
        return session;
    } catch (Exception e) {
        throw new CustomException(ErrCode.ERR_BUSI_BASE, "create session fail");
    }
}

/**
 * create session
 */
public static Session createSession(JSch jSch, String userName, String password, String host, int port) throws CustomException {
    return createSession(jSch, userName, password, host, port, StringUtils.EMPTY, StringUtils.EMPTY);
}

private Session createSession(JSch jSch, String userName, String host, int port) throws CustomException {
    return createSession(jSch, userName, StringUtils.EMPTY, host, port, StringUtils.EMPTY, StringUtils.EMPTY);
}

/**
 * open session
 */
public static Session openSession(JSch jSch, String userName, String password, String host, int port, String privateKeyFile, String passphrase, int timeout) throws CustomException {
    Session session = createSession(jSch, userName, password, host, port, privateKeyFile, passphrase);
    try {
        if (timeout >= 0) {
            session.connect(timeout);
        } else {
            session.connect();
        }
        return session;
    } catch (Exception e) {
        throw new CustomException(ErrCode.ERR_BUSI_BASE, "session connect fail");
    }
}

public static Session openSession(String userName, String password, String host, int port, String privateKeyFile, String passphrase, int timeout) throws CustomException {
    Session session = createSession(userName, password, host, port, privateKeyFile, passphrase);
    try {
        if (timeout >= 0) {
            session.connect(timeout);
        } else {
            session.connect();
        }
        return session;
    } catch (Exception e) {
        throw new CustomException(ErrCode.ERR_BUSI_BASE, e.getMessage());
    }
}

// public static Session openSession(JSch jSch, String userName, String password, String host, int port, int timeout) throws CustomException {
// return openSession(jSch, userName, password, host, port, StringUtils.EMPTY, StringUtils.EMPTY, timeout);
// }
//
// public static Session openSession(String userName, String password, String host, int port, int timeout) throws CustomException {
// return openSession(userName, password, host, port, StringUtils.EMPTY, StringUtils.EMPTY, timeout);
// }
//
// public static Session openSession(JSch jSch, String userName, String host, int port, int timeout) throws CustomException {
// return openSession(jSch, userName, StringUtils.EMPTY, host, port, StringUtils.EMPTY, StringUtils.EMPTY, timeout);
// }
//
// public static Session openSession(String userName, String host, int port, int timeout) throws CustomException {
// return openSession(userName, StringUtils.EMPTY, host, port, StringUtils.EMPTY, StringUtils.EMPTY, timeout);
// }

public static Channel createChannel(Session session, ChannelType channelType) throws CustomException {
    try {
        if (!session.isConnected()) {
            session.connect(1000);
            session.setServerAliveInterval(60000);
        }
        return session.openChannel(channelType.getValue());
    } catch (Exception e) {
        log.error("createChannel fail",e);
        throw new CustomException(ErrCode.ERR_BUSI_BASE, e.getMessage());
    }
}

/**
 * sftp
 *
 * @param session
 * @return
 */

// public static ChannelSftp createSftp(Session session) throws CustomException {
// return (ChannelSftp) createChannel(session, ChannelType.SFTP);
// }
//
// public static ChannelShell createShell(Session session) throws CustomException {
// return (ChannelShell) createChannel(session, ChannelType.SHELL);
// }

public static Channel openChannel(Session session, ChannelType channelType, int timeout) throws CustomException {
    Channel channel = createChannel(session, channelType);
    try {
        if (timeout >= 0) {
            channel.connect(timeout);
        } else {
            channel.connect();
        }
        return channel;
    } catch (Exception e) {
        throw new CustomException(ErrCode.ERR_BUSI_BASE, e.getMessage());
    }
}

public static ChannelSftp openSftpChannel(Session session, int timeout) throws CustomException {
    return (ChannelSftp) openChannel(session, ChannelType.SFTP, timeout);
}

// public static ChannelShell openShellChannel(Session session, int timeout) throws CustomException {
// return (ChannelShell) openChannel(session, ChannelType.SHELL, timeout);
// }

enum ChannelType {
    SESSION("session"),
    SHELL("shell"),
    EXEC("exec"),
    X11("x11"),
    AGENT_FORWARDING("auth-agent@openssh.com"),
    DIRECT_TCPIP("direct-tcpip"),
    FORWARDED_TCPIP("forwarded-tcpip"),
    SFTP("sftp"),
    SUBSYSTEM("subsystem");
    private final String value;

    ChannelType(String value) {
        this.value = value;
    }

    public String getValue() {
        return this.value;
    }
}

}

@slf4j
public class SftpClient implements AutoCloseable {
private SftpFactory sftpFactory;
GenericObjectPool objectPool;

public SftpClient(SftpProperties properties, GenericObjectPoolConfig<ChannelSftp> poolConfig) throws Exception {
    this.sftpFactory = new SftpFactory(properties);
    objectPool = new GenericObjectPool<>(this.sftpFactory, poolConfig);
}

public SftpClient(SftpProperties properties) throws Exception {
    this.sftpFactory = new SftpFactory(properties);
    SftpProperties.PoolConfig config = properties.getPool();
    if (Objects.isNull(config)) {
        objectPool = new GenericObjectPool<>(this.sftpFactory);
    } else {
        GenericObjectPoolConfig<ChannelSftp> poolConfig = new GenericObjectPoolConfig<>();
        poolConfig.setMaxIdle(config.getMaxIdle());
        poolConfig.setMaxTotal(config.getMaxTotal());
        poolConfig.setMinIdle(config.getMinIdle());
        poolConfig.setTestOnBorrow(config.isTestOnBorrow());
        poolConfig.setTestOnCreate(config.isTestOnCreate());
        poolConfig.setTestOnReturn(config.isTestOnReturn());
        poolConfig.setTestWhileIdle(config.isTestWhileIdle());
        poolConfig.setBlockWhenExhausted(config.isBlockWhenExhausted());
        poolConfig.setMaxWait(Duration.ofMillis(config.getMaxWaitMillis()));
        poolConfig.setTimeBetweenEvictionRuns(Duration.ofMillis(config.getTimeBetweenEvictionRunsMillis()));
        objectPool = new GenericObjectPool<>(this.sftpFactory, poolConfig);
    }
}
public void changeConfig() throws Exception {
    this.sftpFactory.reset();
}

@Override
public void close() throws Exception {
    if (Objects.nonNull(this.objectPool)) {
        if (!this.objectPool.isClosed()) {
            this.objectPool.close();
        }
    }
    this.objectPool = null;
    if (Objects.nonNull(this.sftpFactory)) {
        this.sftpFactory.close();
    }
}

/**
 * Upload files to target folder
 *
 * @param in
 * @param targetDir
 * @param targetFileName
 * @return
 * @throws CustomException
 */
public boolean uploadFile(InputStream in, String targetDir, String targetFileName) throws CustomException {
    return uploadFile(in, targetDir, targetFileName, null);
}

/**
 * Upload files, add progress monitor
 *
 * @param in
 * @param targetDir
 * @param targetFileName
 * @param monitor
 * @return
 * @throws CustomException
 */
public boolean uploadFile(InputStream in, String targetDir, String targetFileName, SftpProgressMonitor monitor) throws CustomException {
    ChannelSftp channelSftp = null;
    try {
        channelSftp = this.objectPool.borrowObject();
        this.sftpFactory.create();
        if (!exist(channelSftp, targetDir)) {
            mkdirs(channelSftp, targetDir);
        }else {
            channelSftp.cd(channelSftp.getHome()+"/"+targetDir);
        }
        if (Objects.nonNull(monitor)) {
            channelSftp.put(in, targetFileName, monitor);
        } else {
            channelSftp.put(in, targetFileName);
        }
        return true;
    } catch (Exception e) {
        log.error("sftp fail {}",e.getMessage(),e);
        throw new CustomException(ErrCode.ERR_BUSI_BASE, e.getMessage());
    } finally {
        if (Objects.nonNull(channelSftp)) {
            this.objectPool.returnObject(channelSftp);
        }
    }
}

/**
 * download file
 *
 * @param remoteFile
 * @param targetFilePath
 * @return
 * @throws CustomException
 */
public boolean downloadFile(String remoteFile, String targetFilePath) throws CustomException {
    return downloadFile(remoteFile, targetFilePath, null);
}

/**
 * Download the target file to local
 *
 * @param remoteFile
 * @param targetFilePath
 * @return
 * @throws CustomException
 */
public boolean downloadFile(String remoteFile, String targetFilePath, SftpProgressMonitor monitor) throws CustomException {
    ChannelSftp channelSftp = null;
    try {
        channelSftp = this.objectPool.borrowObject();
        if (!exist(channelSftp, remoteFile)) {
            return false;
        }
        File targetFile = new File(targetFilePath);
        try (FileOutputStream outputStream = new FileOutputStream(targetFile)) {
            if (Objects.nonNull(monitor)) {
                channelSftp.get(remoteFile, outputStream, monitor);
            } else {
                channelSftp.get(remoteFile, outputStream);
            }
        }
        return true;
    } catch (Exception e) {
        throw new CustomException(ErrCode.ERR_BUSI_BASE, "upload file fail");
    } finally {
        if (Objects.nonNull(channelSftp)) {
            this.objectPool.returnObject(channelSftp);
        }
    }
}

/**
 * download file
 *
 * @param remoteFile
 * @param outputStream
 * @return
 * @throws CustomException
 */
public boolean downloadFile(String remoteFile, OutputStream outputStream) throws CustomException {
    return downloadFile(remoteFile, outputStream, null);
}

/**
 * download file
 *
 * @param remoteFile
 * @param outputStream
 * @param monitor
 * @return
 * @throws CustomException
 */
public boolean downloadFile(String remoteFile, OutputStream outputStream, SftpProgressMonitor monitor) throws CustomException {
    ChannelSftp channelSftp = null;
    try {
        channelSftp = this.objectPool.borrowObject();
        if (!exist(channelSftp, remoteFile)) {
            return false;
        }
        if (Objects.nonNull(monitor)) {
            channelSftp.get(remoteFile, outputStream, monitor);
        } else {
            channelSftp.get(remoteFile, outputStream);
        }
        return true;
    } catch (Exception e) {
        throw new CustomException(ErrCode.ERR_BUSI_BASE, "upload file fail");
    } finally {
        if (Objects.nonNull(channelSftp)) {
            this.objectPool.returnObject(channelSftp);
        }
    }
}

/**
 * Create folder
 *
 * @param channelSftp
 * @param dir
 * @return
 */
protected boolean mkdirs(ChannelSftp channelSftp, String dir) {
    try {

// String pwd = channelSftp.pwd();
// if (StringUtils.contains(pwd, dir)) {
// return true;
// }
channelSftp.cd(channelSftp.getHome());
String[] dirs = StringUtils.splitByWholeSeparatorPreserveAllTokens(dir, "/");
for (String path : dirs) {
if (StringUtils.isBlank(path)) {
continue;
}
try {
channelSftp.cd(path);
} catch (SftpException e) {
channelSftp.mkdir(path);
channelSftp.cd(path);
}
}
return true;
} catch (Exception e) {
return false;
}
}

/**
 * Determine whether the folder exists
 *
 * @param channelSftp
 * @param dir
 * @return
 */
protected boolean exist(ChannelSftp channelSftp, String dir) {
    try {
        channelSftp.lstat(channelSftp.getHome()+"/"+dir);
        return true;
    } catch (Exception e) {
        return false;
    }
}

}

public class SftpFactory extends BasePooledObjectFactory implements AutoCloseable {
private Session session;
private SftpProperties properties;

SftpFactory(SftpProperties properties) throws Exception {
    this.properties = properties;
    String username = properties.getUsername();
    String password = properties.getPassword();
    String host = properties.getHost();
    int port = properties.getPort();
    String privateKeyFile = properties.getPrivateKeyFile();
    String passphrase = properties.getPassphrase();
    session = JschUtil.createSession(username, password, host, port, privateKeyFile, passphrase);
}

@Override
public void destroyObject(PooledObject<ChannelSftp> p) throws Exception {
    p.getObject().disconnect();
}

public void reset() throws Exception {
    session.disconnect();
    String username = properties.getUsername();
    String password = properties.getPassword();
    String host = properties.getHost();
    int port = properties.getPort();
    String privateKeyFile = properties.getPrivateKeyFile();
    String passphrase = properties.getPassphrase();
    session = JschUtil.createSession(username, password, host, port, privateKeyFile, passphrase);
}

@Override
public void close() throws Exception {
    if (Objects.nonNull(session)) {
        if (session.isConnected()) {
            session.disconnect();
        }
        session = null;
    }
}
@Override
public boolean validateObject(PooledObject<ChannelSftp> p) {
    return p.getObject().isConnected();
}

@Override
public ChannelSftp create() throws Exception {
    int timeout = properties.getTimeout();
    return JschUtil.openSftpChannel(this.session, timeout);
}

@Override
public PooledObject<ChannelSftp> wrap(ChannelSftp channelSftp) {
    return new DefaultPooledObject<>(channelSftp);
}

}

@DaTa
@configuration
@RefreshScope
@ConfigurationProperties(prefix = "cvl.sftp.config")
public class SftpProperties {
private String username;
private String password;
private String host;
private int port;
private String privateKeyFile;
private String passphrase;
private int timeout;
private PoolConfig pool;
public String getPassword() {
return AesCoder.getAESDecodeResult(password, AdminPartnerConfigService.key);
}

@Data
public static class PoolConfig {
    private int maxIdle;
    private int minIdle;
    private int maxTotal;
    //If the object pool is full, whether to block the acquisition (false, if it cannot be borrowed, an exception will be thrown directly)
    private boolean blockWhenExhausted;
    // It takes effect when Block When Exhausted is true. When the object pool is full, the blocking acquisition will time out. If it is not set, the blocking acquisition will not time out. You can also pass the second parameter in the borrow Object method to specify the timeout time this time.
    private long maxWaitMillis;
    // Whether to verify the object after creating it, call object Factoryvalidate Object
    private boolean testOnCreate;
    // Whether to validate the object after borrowing it validateObject
    private boolean testOnBorrow;
    // Whether to validate the object after returning it validateObject
    private boolean testOnReturn;
    // Whether to validate the object during the scheduled check validateObject
    private boolean testWhileIdle;
    //Regularly check to eliminate redundant objects and enable separate thread processing
    private long timeBetweenEvictionRunsMillis;
    //jmx monitoring conflicts with the jmx that comes with springboot. You can choose to turn off this configuration or turn off the jmx configuration of springboot.
    private boolean jmxEnabled;
}

}

@norrisjeremy
Copy link
Contributor

Hi @huang-xuan,

We will need to provide more specifics about whatever exception or issue you are seeing in order to provide any sort of assistance.
For example, please provide a full Exception backtrace.
And additionally, you can enable logging in JSch and then capture that info and provide it as well.
See the JSch.setLogger() method, along with our several built-in logger types (JulLogger, Log4j2Logger & Slf4jLogger) depending upon whatever logging implementation is easiest to use with your application.

Thanks,
Jeremy

@theotherp
Copy link

I have found that reconnecting to an older session causes this problem. Throw away the session and create and connect a new one.

@norrisjeremy
Copy link
Contributor

I would highly discourage anyone from trying to reuse a Session object after it has been disconnected as I am confident that there are subtle bugs that will prevent it from working reliably.

@theotherp
Copy link

I think that could be communicated better (or at all?). Perhaps even throw an exception in that case?

@norrisjeremy
Copy link
Contributor

I think that could be communicated better (or at all?). Perhaps even throw an exception in that case?

No, because there are active users that despite the subtle bugs are reusing Session objects for whatever reason.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants