Skip to content

Commit

Permalink
SNOW-480523 Fix Downscope URL for GCS (#634)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-japatel committed May 18, 2023
1 parent f26fb2f commit f8da3c5
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* @sfc-gh-japatel @sfc-gh-tzhang @sfc-gh-tjones @sfc-gh-rcheng
* @sfc-gh-japatel @sfc-gh-tzhang @sfc-gh-tjones @sfc-gh-rcheng @snowflakedb/streaming-ingest
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<groupId>com.snowflake</groupId>
<artifactId>snowflake-kafka-connector</artifactId>
<version>1.9.2</version>
<version>1.9.3</version>
<packaging>jar</packaging>
<name>Snowflake Kafka Connector</name>
<description>Snowflake Kafka Connect Sink Connector</description>
Expand Down Expand Up @@ -327,14 +327,14 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
<version>3.13.29</version>
<version>3.13.30</version>
</dependency>

<!-- Ingest SDK for copy staged file into snowflake table -->
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>1.1.3</version>
<version>1.1.4</version>
<exclusions>
<exclusion>
<groupId>net.snowflake</groupId>
Expand Down
6 changes: 3 additions & 3 deletions pom_confluent.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<groupId>com.snowflake</groupId>
<artifactId>snowflake-kafka-connector</artifactId>
<version>1.9.2</version>
<version>1.9.3</version>
<packaging>jar</packaging>
<name>Snowflake Kafka Connector</name>
<description>Snowflake Kafka Connect Sink Connector</description>
Expand Down Expand Up @@ -379,14 +379,14 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
<version>3.13.29</version>
<version>3.13.30</version>
</dependency>

<!-- Ingest SDK for copy staged file into snowflake table -->
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>1.1.3</version>
<version>1.1.4</version>
<exclusions>
<exclusion>
<groupId>net.snowflake</groupId>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
public class Utils {

// Connector version, change every release
public static final String VERSION = "1.9.2";
public static final String VERSION = "1.9.3";

// connector parameter list
public static final String NAME = "name";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ public void putWithCache(
.setSnowflakeFileTransferMetadata(fileTransferMetadata)
.setUploadStream(inStream)
.setRequireCompress(true)
// Setting a destinationFileName is a no-op for AWS and Azure since it still uses
// presignedUrlFileName
// Setting destFileName is useful for GCS and downscope URL
.setDestFileName(FilenameUtils.getName(fullFilePath))
.setOcspMode(OCSPMode.FAIL_OPEN)
.setProxyProperties(proxyProperties)
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT;
import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA;

import com.google.common.annotations.VisibleForTesting;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.internal.KCLogger;
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
Expand All @@ -40,7 +41,6 @@
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ArrayNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.JsonNodeFactory;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode;
import net.snowflake.ingest.internal.apache.arrow.util.VisibleForTesting;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
Expand Down
16 changes: 8 additions & 8 deletions test/test_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,33 +109,33 @@ def verifyWaitTime(self):
self.VERIFY_INTERVAL), flush=True)
sleep(self.VERIFY_INTERVAL)

def verifyWithRetry(self, func, round):
def verifyWithRetry(self, func, round, configFileName):
retryNum = 0
while retryNum < self.MAX_RETRY:
try:
func(round)
break
except test_suit.test_utils.ResetAndRetry:
retryNum = 0
print(datetime.now().strftime("%H:%M:%S "), "=== Reset retry count and retry ===", flush=True)
print(datetime.now().strftime("%H:%M:%S "), "=== Reset retry count and retry {}===".format(configFileName), flush=True)
except test_suit.test_utils.RetryableError as e:
retryNum += 1
print(datetime.now().strftime("%H:%M:%S "), "=== Failed, retryable. {}===".format(e.msg), flush=True)
print(datetime.now().strftime("%H:%M:%S "), "=== Failed {}, retryable. {}===".format(configFileName, e.msg), flush=True)
self.verifyWaitTime()
except test_suit.test_utils.NonRetryableError as e:
print(datetime.now().strftime("\n%H:%M:%S "), "=== Non retryable error raised ===\n{}".format(e.msg),
print(datetime.now().strftime("\n%H:%M:%S "), "=== Non retryable error for {} raised ===\n{}".format(configFileName, e.msg),
flush=True)
raise test_suit.test_utils.NonRetryableError()
except snowflake.connector.errors.ProgrammingError as e:
print("Error in VerifyWithRetry" + str(e))
print("Error in VerifyWithRetry for {}".format(configFileName) + str(e))
if e.errno == 2003:
retryNum += 1
print(datetime.now().strftime("%H:%M:%S "), "=== Failed, table not created ===", flush=True)
print(datetime.now().strftime("%H:%M:%S "), "=== Failed, table not created for {} ===".format(configFileName), flush=True)
self.verifyWaitTime()
else:
raise
if retryNum == self.MAX_RETRY:
print(datetime.now().strftime("\n%H:%M:%S "), "=== Max retry exceeded ===", flush=True)
print(datetime.now().strftime("\n%H:%M:%S "), "=== Max retry exceeded for {} ===".format(configFileName), flush=True)
raise test_suit.test_utils.NonRetryableError()

def createTopics(self, topicName, partitionNum=1, replicationNum=1):
Expand Down Expand Up @@ -551,7 +551,7 @@ def execution(testSet, testSuitList, testCleanEnableList, testSuitEnableList, dr
for i, test in enumerate(testSuitList):
if testSuitEnableList[i]:
print(datetime.now().strftime("\n%H:%M:%S "), "=== Verify " + test.__class__.__name__ + " ===")
driver.verifyWithRetry(test.verify, r)
driver.verifyWithRetry(test.verify, r, test.getConfigFileName())
print(datetime.now().strftime("%H:%M:%S "), "=== Passed " + test.__class__.__name__ + " ===",
flush=True)

Expand Down

0 comments on commit f8da3c5

Please sign in to comment.