Skip to content

Commit

Permalink
[release-1.11] Cherry-pick sql server fixes #2912 (#2920)
Browse files Browse the repository at this point in the history
Signed-off-by: Bernd Verst <github@bernd.dev>
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
  • Loading branch information
berndverst and ItalyPaleAle authored Jun 20, 2023
1 parent 6d64a72 commit 802221b
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 258 deletions.
2 changes: 1 addition & 1 deletion .github/infrastructure/docker-compose-sqlserver.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '2'
services:
sqlserver:
image: mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04
image: mcr.microsoft.com/mssql/server:2019-latest
ports:
- "1433:1433"
environment:
Expand Down
12 changes: 6 additions & 6 deletions state/sqlserver/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func newMigration(metadata *sqlServerMetadata) migrator {
func (m *migration) newMigrationResult() migrationResult {
r := migrationResult{
itemRefTableTypeName: fmt.Sprintf("[%s].%s_Table", m.metadata.Schema, m.metadata.TableName),
upsertProcName: fmt.Sprintf("sp_Upsert_v3_%s", m.metadata.TableName),
getCommand: fmt.Sprintf("SELECT [Data], [RowVersion] FROM [%s].[%s] WHERE [Key] = @Key AND ([ExpireDate] IS NULL OR [ExpireDate] > GETDATE())", m.metadata.Schema, m.metadata.TableName),
upsertProcName: fmt.Sprintf("sp_Upsert_v4_%s", m.metadata.TableName),
getCommand: fmt.Sprintf("SELECT [Data], [RowVersion], [ExpireDate] FROM [%s].[%s] WHERE [Key] = @Key AND ([ExpireDate] IS NULL OR [ExpireDate] > GETDATE())", m.metadata.Schema, m.metadata.TableName),
deleteWithETagCommand: fmt.Sprintf(`DELETE [%s].[%s] WHERE [Key]=@Key AND [RowVersion]=@RowVersion`, m.metadata.Schema, m.metadata.TableName),
deleteWithoutETagCommand: fmt.Sprintf(`DELETE [%s].[%s] WHERE [Key]=@Key`, m.metadata.Schema, m.metadata.TableName),
}
Expand Down Expand Up @@ -296,7 +296,7 @@ func (m *migration) ensureUpsertStoredProcedureExists(ctx context.Context, db *s
BEGIN
UPDATE [%[3]s]
SET [Data]=@Data, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END
WHERE [Key]=@Key AND RowVersion = @RowVersion AND (([RowVersion] IS NULL) OR ([ExpireDate] IS NULL OR [ExpireDate] > GETDATE()))
WHERE [Key]=@Key AND RowVersion = @RowVersion
END
COMMIT;
END
Expand All @@ -316,7 +316,7 @@ func (m *migration) ensureUpsertStoredProcedureExists(ctx context.Context, db *s
IF ERROR_NUMBER() IN (2601, 2627)
UPDATE [%[3]s]
SET [Data]=@Data, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END
WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion) AND (([RowVersion] IS NULL) OR ([ExpireDate] IS NULL OR [ExpireDate] > GETDATE()))
WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion)
END CATCH
END
COMMIT;
Expand All @@ -328,7 +328,7 @@ func (m *migration) ensureUpsertStoredProcedureExists(ctx context.Context, db *s
BEGIN
UPDATE [%[3]s]
SET [Data]=@Data, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END
WHERE [Key]=@Key AND RowVersion = @RowVersion AND (([RowVersion] IS NULL) OR ([ExpireDate] IS NULL OR [ExpireDate] > GETDATE()))
WHERE [Key]=@Key AND RowVersion = @RowVersion
RETURN
END
ELSE
Expand All @@ -341,7 +341,7 @@ func (m *migration) ensureUpsertStoredProcedureExists(ctx context.Context, db *s
IF ERROR_NUMBER() IN (2601, 2627)
UPDATE [%[3]s]
SET [Data]=@Data, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END
WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion) AND (([RowVersion] IS NULL) OR ([ExpireDate] IS NULL OR [ExpireDate] > GETDATE()))
WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion)
END CATCH
END
END
Expand Down
11 changes: 7 additions & 4 deletions state/sqlserver/sqlserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type IndexedProperty struct {
Type string `json:"type"`
}

// SQLServer defines a Ms SQL Server based state store.
// SQLServer defines a MS SQL Server based state store.
type SQLServer struct {
state.BulkStore

Expand Down Expand Up @@ -259,9 +259,12 @@ func (s *SQLServer) Get(ctx context.Context, req *state.GetRequest) (*state.GetR
return &state.GetResponse{}, nil
}

var data string
var rowVersion []byte
err = rows.Scan(&data, &rowVersion)
var (
data string
rowVersion []byte
expireDate sql.NullTime
)
err = rows.Scan(&data, &rowVersion, &expireDate)
if err != nil {
return nil, err
}
Expand Down
234 changes: 10 additions & 224 deletions state/sqlserver/sqlserver_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,8 @@ type userWithEtag struct {
etag string
}

func getMasterConnectionString() string {
return os.Getenv(connectionStringEnvKey)
}

func TestIntegrationCases(t *testing.T) {
connectionString := getMasterConnectionString()
connectionString := os.Getenv(connectionStringEnvKey)
if connectionString == "" {
t.Skipf("SQLServer state integration tests skipped. To enable define the connection string using environment variable '%s' (example 'export %s=\"server=localhost;user id=sa;password=Pass@Word1;port=1433;\")", connectionStringEnvKey, connectionStringEnvKey)
}
Expand All @@ -78,8 +74,6 @@ func TestIntegrationCases(t *testing.T) {
t.Run("Set New Record With Invalid Etag Should Fail", testSetNewRecordWithInvalidEtagShouldFail)
t.Run("Indexed Properties", testIndexedProperties)
t.Run("Multi operations", testMultiOperations)
t.Run("Bulk sets", testBulkSet)
t.Run("Bulk delete", testBulkDelete)
t.Run("Insert and Update Set Record Dates", testInsertAndUpdateSetRecordDates)
t.Run("Multiple initializations", testMultipleInitializations)

Expand All @@ -100,7 +94,7 @@ func getUniqueDBSchema() string {
func createMetadata(schema string, kt KeyType, indexedProperties string) state.Metadata {
metadata := state.Metadata{Base: metadata.Base{
Properties: map[string]string{
connectionStringKey: getMasterConnectionString(),
connectionStringKey: os.Getenv(connectionStringEnvKey),
schemaKey: schema,
tableNameKey: usersTableName,
keyTypeKey: string(kt),
Expand All @@ -125,8 +119,10 @@ func getTestStoreWithKeyType(t *testing.T, kt KeyType, indexedProperties string)
schema := getUniqueDBSchema()
metadata := createMetadata(schema, kt, indexedProperties)
store := &SQLServer{
logger: logger.NewLogger("test"),
logger: logger.NewLogger("test"),
migratorFactory: newMigration,
}
store.BulkStore = state.NewDefaultBulkStore(store)
err := store.Init(context.Background(), metadata)
require.NoError(t, err)

Expand Down Expand Up @@ -162,13 +158,9 @@ func assertUserDoesNotExist(t *testing.T, store *SQLServer, key string) {
}

func assertDBQuery(t *testing.T, store *SQLServer, query string, assertReader func(t *testing.T, rows *sql.Rows)) {
db, err := sql.Open("sqlserver", store.metadata.ConnectionString)
require.NoError(t, err)
defer db.Close()

rows, err := db.Query(query)
rows, err := store.db.Query(query)
require.NoError(t, err)
assert.Nil(t, rows.Err())
require.NoError(t, rows.Err())

defer rows.Close()
assertReader(t, rows)
Expand Down Expand Up @@ -493,214 +485,6 @@ func testMultiOperations(t *testing.T) {
}
}

func testBulkSet(t *testing.T) {
tests := []struct {
name string
kt KeyType
keyGen userKeyGenerator
}{
{"Bulk set string key type", StringKeyType, &numbericKeyGenerator{}},
{"Bulk set integer key type", IntegerKeyType, &numbericKeyGenerator{}},
{"Bulk set uuid key type", UUIDKeyType, &uuidKeyGenerator{}},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
store := getTestStoreWithKeyType(t, test.kt, "")
keyGen := test.keyGen

initialUsers := []user{
{keyGen.NextKey(), "John", "Coffee"},
{keyGen.NextKey(), "Laura", "Water"},
{keyGen.NextKey(), "Carl", "Beer"},
}

totalUsers := 0
userIndex := 0

t.Run("Add initial users", func(t *testing.T) {
sets := make([]state.SetRequest, len(initialUsers))
for i, u := range initialUsers {
sets[i] = state.SetRequest{Key: u.ID, Value: u}
}

err := store.BulkSet(context.Background(), sets, state.BulkStoreOpts{})
require.NoError(t, err)
totalUsers = len(sets)
assertUserCountIsEqualTo(t, store, totalUsers)
})

t.Run("Add 1, update 1 with valid etag", func(t *testing.T) {
toModify, toModifyETag := assertUserExists(t, store, initialUsers[userIndex].ID)
modified := toModify
modified.FavoriteBeverage = beverageTea
toInsert := user{keyGen.NextKey(), "Maria", "Wine"}

err := store.BulkSet(context.Background(), []state.SetRequest{
{Key: modified.ID, Value: modified, ETag: &toModifyETag},
{Key: toInsert.ID, Value: toInsert},
}, state.BulkStoreOpts{})
require.NoError(t, err)
assertLoadedUserIsEqual(t, store, modified.ID, modified)
assertLoadedUserIsEqual(t, store, toInsert.ID, toInsert)
totalUsers++
assertUserCountIsEqualTo(t, store, totalUsers)

userIndex++
})

t.Run("Add 1, update 1 without etag", func(t *testing.T) {
toModify := initialUsers[userIndex]
modified := toModify
modified.FavoriteBeverage = beverageTea
toInsert := user{keyGen.NextKey(), "Tony", "Milk"}

err := store.BulkSet(context.Background(), []state.SetRequest{
{Key: modified.ID, Value: modified},
{Key: toInsert.ID, Value: toInsert},
}, state.BulkStoreOpts{})
require.NoError(t, err)
assertLoadedUserIsEqual(t, store, modified.ID, modified)
assertLoadedUserIsEqual(t, store, toInsert.ID, toInsert)
totalUsers++
assertUserCountIsEqualTo(t, store, totalUsers)

userIndex++
})

t.Run("Failed upsert due to etag should be aborted", func(t *testing.T) {
toInsert1 := user{keyGen.NextKey(), "Ted1", "Beer"}
toInsert2 := user{keyGen.NextKey(), "Ted2", "Beer"}
toModify := initialUsers[userIndex]
modified := toModify
modified.FavoriteBeverage = beverageTea

invEtag := invalidEtag
sets := []state.SetRequest{
{Key: toInsert1.ID, Value: toInsert1},
{Key: toInsert2.ID, Value: toInsert2},
{Key: modified.ID, Value: modified, ETag: &invEtag},
}

err := store.BulkSet(context.Background(), sets, state.BulkStoreOpts{})
assert.NotNil(t, err)
assertUserCountIsEqualTo(t, store, totalUsers)
assertUserDoesNotExist(t, store, toInsert1.ID)
assertUserDoesNotExist(t, store, toInsert2.ID)
assertLoadedUserIsEqual(t, store, modified.ID, toModify)
assertUserCountIsEqualTo(t, store, totalUsers)
})
})
}
}

func testBulkDelete(t *testing.T) {
tests := []struct {
name string
kt KeyType
keyGen userKeyGenerator
}{
{"Bulk delete string key type", StringKeyType, &numbericKeyGenerator{}},
{"Bulk delete integer key type", IntegerKeyType, &numbericKeyGenerator{}},
{"Bulk delete uuid key type", UUIDKeyType, &uuidKeyGenerator{}},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
store := getTestStoreWithKeyType(t, test.kt, "")
keyGen := test.keyGen

initialUsers := []user{
{keyGen.NextKey(), "John", "Coffee"},
{keyGen.NextKey(), "Laura", "Water"},
{keyGen.NextKey(), "Carl", "Beer"},
{keyGen.NextKey(), "Maria", "Wine"},
{keyGen.NextKey(), "Mark", "Juice"},
{keyGen.NextKey(), "Sara", "Soda"},
{keyGen.NextKey(), "Tony", "Milk"},
{keyGen.NextKey(), "Hugo", "Juice"},
}

sets := make([]state.SetRequest, len(initialUsers))
for i, u := range initialUsers {
sets[i] = state.SetRequest{Key: u.ID, Value: u}
}
err := store.BulkSet(context.Background(), sets, state.BulkStoreOpts{})
require.NoError(t, err)
totalUsers := len(initialUsers)
assertUserCountIsEqualTo(t, store, totalUsers)

userIndex := 0

t.Run("Delete 2 items without etag should work", func(t *testing.T) {
deleted1 := initialUsers[userIndex].ID
deleted2 := initialUsers[userIndex+1].ID
err := store.BulkDelete(context.Background(), []state.DeleteRequest{
{Key: deleted1},
{Key: deleted2},
}, state.BulkStoreOpts{})
require.NoError(t, err)
totalUsers -= 2
assertUserCountIsEqualTo(t, store, totalUsers)
assertUserDoesNotExist(t, store, deleted1)
assertUserDoesNotExist(t, store, deleted2)

userIndex += 2
})

t.Run("Delete 2 items with etag should work", func(t *testing.T) {
deleted1, deleted1Etag := assertUserExists(t, store, initialUsers[userIndex].ID)
deleted2, deleted2Etag := assertUserExists(t, store, initialUsers[userIndex+1].ID)

err := store.BulkDelete(context.Background(), []state.DeleteRequest{
{Key: deleted1.ID, ETag: &deleted1Etag},
{Key: deleted2.ID, ETag: &deleted2Etag},
}, state.BulkStoreOpts{})
require.NoError(t, err)
totalUsers -= 2
assertUserCountIsEqualTo(t, store, totalUsers)
assertUserDoesNotExist(t, store, deleted1.ID)
assertUserDoesNotExist(t, store, deleted2.ID)

userIndex += 2
})

t.Run("Delete with/without etag should work", func(t *testing.T) {
deleted1, deleted1Etag := assertUserExists(t, store, initialUsers[userIndex].ID)
deleted2 := initialUsers[userIndex+1]

err := store.BulkDelete(context.Background(), []state.DeleteRequest{
{Key: deleted1.ID, ETag: &deleted1Etag},
{Key: deleted2.ID},
}, state.BulkStoreOpts{})
require.NoError(t, err)
totalUsers -= 2
assertUserCountIsEqualTo(t, store, totalUsers)
assertUserDoesNotExist(t, store, deleted1.ID)
assertUserDoesNotExist(t, store, deleted2.ID)

userIndex += 2
})

t.Run("Failed delete due to etag should be aborted", func(t *testing.T) {
deleted1, deleted1Etag := assertUserExists(t, store, initialUsers[userIndex].ID)
deleted2 := initialUsers[userIndex+1]

invEtag := invalidEtag
err := store.BulkDelete(context.Background(), []state.DeleteRequest{
{Key: deleted1.ID, ETag: &deleted1Etag},
{Key: deleted2.ID, ETag: &invEtag},
}, state.BulkStoreOpts{})
assert.NotNil(t, err)
assert.NotNil(t, err)
assertUserCountIsEqualTo(t, store, totalUsers)
assertUserExists(t, store, deleted1.ID)
assertUserExists(t, store, deleted2.ID)
})
})
}
}

/* #nosec. */
func testInsertAndUpdateSetRecordDates(t *testing.T) {
const maxDiffInMs = float64(500)
Expand Down Expand Up @@ -803,8 +587,10 @@ func testMultipleInitializations(t *testing.T) {
store := getTestStoreWithKeyType(t, test.kt, test.indexedProperties)

store2 := &SQLServer{
logger: logger.NewLogger("test"),
logger: logger.NewLogger("test"),
migratorFactory: newMigration,
}
store2.BulkStore = state.NewDefaultBulkStore(store2)
err := store2.Init(context.Background(), createMetadata(store.metadata.Schema, test.kt, test.indexedProperties))
assert.NoError(t, err)
})
Expand Down
2 changes: 1 addition & 1 deletion tests/certification/state/sqlserver/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: "3.7"
services:
sqlserver:
image: mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04
image: mcr.microsoft.com/mssql/server:2019-latest
ports:
- "1433:1433"
environment:
Expand Down
Loading

0 comments on commit 802221b

Please sign in to comment.