From 802221bdea50b3661507243595e9381835de38c2 Mon Sep 17 00:00:00 2001 From: Bernd Verst Date: Tue, 20 Jun 2023 09:11:09 -0700 Subject: [PATCH] [release-1.11] Cherry-pick sql server fixes #2912 (#2920) Signed-off-by: Bernd Verst Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> --- .../docker-compose-sqlserver.yml | 2 +- state/sqlserver/migration.go | 12 +- state/sqlserver/sqlserver.go | 11 +- state/sqlserver/sqlserver_integration_test.go | 234 +----------------- .../state/sqlserver/docker-compose.yml | 2 +- .../state/sqlserver/sqlserver_test.go | 46 ++-- 6 files changed, 49 insertions(+), 258 deletions(-) diff --git a/.github/infrastructure/docker-compose-sqlserver.yml b/.github/infrastructure/docker-compose-sqlserver.yml index 5b041506a3..f878ba221e 100644 --- a/.github/infrastructure/docker-compose-sqlserver.yml +++ b/.github/infrastructure/docker-compose-sqlserver.yml @@ -1,7 +1,7 @@ version: '2' services: sqlserver: - image: mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04 + image: mcr.microsoft.com/mssql/server:2019-latest ports: - "1433:1433" environment: diff --git a/state/sqlserver/migration.go b/state/sqlserver/migration.go index a7e3fef1bd..f8a0dd6b4e 100644 --- a/state/sqlserver/migration.go +++ b/state/sqlserver/migration.go @@ -46,8 +46,8 @@ func newMigration(metadata *sqlServerMetadata) migrator { func (m *migration) newMigrationResult() migrationResult { r := migrationResult{ itemRefTableTypeName: fmt.Sprintf("[%s].%s_Table", m.metadata.Schema, m.metadata.TableName), - upsertProcName: fmt.Sprintf("sp_Upsert_v3_%s", m.metadata.TableName), - getCommand: fmt.Sprintf("SELECT [Data], [RowVersion] FROM [%s].[%s] WHERE [Key] = @Key AND ([ExpireDate] IS NULL OR [ExpireDate] > GETDATE())", m.metadata.Schema, m.metadata.TableName), + upsertProcName: fmt.Sprintf("sp_Upsert_v4_%s", m.metadata.TableName), + getCommand: fmt.Sprintf("SELECT [Data], [RowVersion], [ExpireDate] FROM [%s].[%s] WHERE [Key] = @Key AND ([ExpireDate] IS NULL OR [ExpireDate] > GETDATE())", m.metadata.Schema, m.metadata.TableName), deleteWithETagCommand: fmt.Sprintf(`DELETE [%s].[%s] WHERE [Key]=@Key AND [RowVersion]=@RowVersion`, m.metadata.Schema, m.metadata.TableName), deleteWithoutETagCommand: fmt.Sprintf(`DELETE [%s].[%s] WHERE [Key]=@Key`, m.metadata.Schema, m.metadata.TableName), } @@ -296,7 +296,7 @@ func (m *migration) ensureUpsertStoredProcedureExists(ctx context.Context, db *s BEGIN UPDATE [%[3]s] SET [Data]=@Data, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END - WHERE [Key]=@Key AND RowVersion = @RowVersion AND (([RowVersion] IS NULL) OR ([ExpireDate] IS NULL OR [ExpireDate] > GETDATE())) + WHERE [Key]=@Key AND RowVersion = @RowVersion END COMMIT; END @@ -316,7 +316,7 @@ func (m *migration) ensureUpsertStoredProcedureExists(ctx context.Context, db *s IF ERROR_NUMBER() IN (2601, 2627) UPDATE [%[3]s] SET [Data]=@Data, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END - WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion) AND (([RowVersion] IS NULL) OR ([ExpireDate] IS NULL OR [ExpireDate] > GETDATE())) + WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion) END CATCH END COMMIT; @@ -328,7 +328,7 @@ func (m *migration) ensureUpsertStoredProcedureExists(ctx context.Context, db *s BEGIN UPDATE [%[3]s] SET [Data]=@Data, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END - WHERE [Key]=@Key AND RowVersion = @RowVersion AND (([RowVersion] IS NULL) OR ([ExpireDate] IS NULL OR [ExpireDate] > GETDATE())) + WHERE [Key]=@Key AND RowVersion = @RowVersion RETURN END ELSE @@ -341,7 +341,7 @@ func (m *migration) ensureUpsertStoredProcedureExists(ctx context.Context, db *s IF ERROR_NUMBER() IN (2601, 2627) UPDATE [%[3]s] SET [Data]=@Data, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END - WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion) AND (([RowVersion] IS NULL) OR ([ExpireDate] IS NULL OR [ExpireDate] > GETDATE())) + WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion) END CATCH END END diff --git a/state/sqlserver/sqlserver.go b/state/sqlserver/sqlserver.go index 89030fe369..9a504461fd 100644 --- a/state/sqlserver/sqlserver.go +++ b/state/sqlserver/sqlserver.go @@ -78,7 +78,7 @@ type IndexedProperty struct { Type string `json:"type"` } -// SQLServer defines a Ms SQL Server based state store. +// SQLServer defines a MS SQL Server based state store. type SQLServer struct { state.BulkStore @@ -259,9 +259,12 @@ func (s *SQLServer) Get(ctx context.Context, req *state.GetRequest) (*state.GetR return &state.GetResponse{}, nil } - var data string - var rowVersion []byte - err = rows.Scan(&data, &rowVersion) + var ( + data string + rowVersion []byte + expireDate sql.NullTime + ) + err = rows.Scan(&data, &rowVersion, &expireDate) if err != nil { return nil, err } diff --git a/state/sqlserver/sqlserver_integration_test.go b/state/sqlserver/sqlserver_integration_test.go index 83d0a027cd..ca7170db44 100644 --- a/state/sqlserver/sqlserver_integration_test.go +++ b/state/sqlserver/sqlserver_integration_test.go @@ -64,12 +64,8 @@ type userWithEtag struct { etag string } -func getMasterConnectionString() string { - return os.Getenv(connectionStringEnvKey) -} - func TestIntegrationCases(t *testing.T) { - connectionString := getMasterConnectionString() + connectionString := os.Getenv(connectionStringEnvKey) if connectionString == "" { t.Skipf("SQLServer state integration tests skipped. To enable define the connection string using environment variable '%s' (example 'export %s=\"server=localhost;user id=sa;password=Pass@Word1;port=1433;\")", connectionStringEnvKey, connectionStringEnvKey) } @@ -78,8 +74,6 @@ func TestIntegrationCases(t *testing.T) { t.Run("Set New Record With Invalid Etag Should Fail", testSetNewRecordWithInvalidEtagShouldFail) t.Run("Indexed Properties", testIndexedProperties) t.Run("Multi operations", testMultiOperations) - t.Run("Bulk sets", testBulkSet) - t.Run("Bulk delete", testBulkDelete) t.Run("Insert and Update Set Record Dates", testInsertAndUpdateSetRecordDates) t.Run("Multiple initializations", testMultipleInitializations) @@ -100,7 +94,7 @@ func getUniqueDBSchema() string { func createMetadata(schema string, kt KeyType, indexedProperties string) state.Metadata { metadata := state.Metadata{Base: metadata.Base{ Properties: map[string]string{ - connectionStringKey: getMasterConnectionString(), + connectionStringKey: os.Getenv(connectionStringEnvKey), schemaKey: schema, tableNameKey: usersTableName, keyTypeKey: string(kt), @@ -125,8 +119,10 @@ func getTestStoreWithKeyType(t *testing.T, kt KeyType, indexedProperties string) schema := getUniqueDBSchema() metadata := createMetadata(schema, kt, indexedProperties) store := &SQLServer{ - logger: logger.NewLogger("test"), + logger: logger.NewLogger("test"), + migratorFactory: newMigration, } + store.BulkStore = state.NewDefaultBulkStore(store) err := store.Init(context.Background(), metadata) require.NoError(t, err) @@ -162,13 +158,9 @@ func assertUserDoesNotExist(t *testing.T, store *SQLServer, key string) { } func assertDBQuery(t *testing.T, store *SQLServer, query string, assertReader func(t *testing.T, rows *sql.Rows)) { - db, err := sql.Open("sqlserver", store.metadata.ConnectionString) - require.NoError(t, err) - defer db.Close() - - rows, err := db.Query(query) + rows, err := store.db.Query(query) require.NoError(t, err) - assert.Nil(t, rows.Err()) + require.NoError(t, rows.Err()) defer rows.Close() assertReader(t, rows) @@ -493,214 +485,6 @@ func testMultiOperations(t *testing.T) { } } -func testBulkSet(t *testing.T) { - tests := []struct { - name string - kt KeyType - keyGen userKeyGenerator - }{ - {"Bulk set string key type", StringKeyType, &numbericKeyGenerator{}}, - {"Bulk set integer key type", IntegerKeyType, &numbericKeyGenerator{}}, - {"Bulk set uuid key type", UUIDKeyType, &uuidKeyGenerator{}}, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - store := getTestStoreWithKeyType(t, test.kt, "") - keyGen := test.keyGen - - initialUsers := []user{ - {keyGen.NextKey(), "John", "Coffee"}, - {keyGen.NextKey(), "Laura", "Water"}, - {keyGen.NextKey(), "Carl", "Beer"}, - } - - totalUsers := 0 - userIndex := 0 - - t.Run("Add initial users", func(t *testing.T) { - sets := make([]state.SetRequest, len(initialUsers)) - for i, u := range initialUsers { - sets[i] = state.SetRequest{Key: u.ID, Value: u} - } - - err := store.BulkSet(context.Background(), sets, state.BulkStoreOpts{}) - require.NoError(t, err) - totalUsers = len(sets) - assertUserCountIsEqualTo(t, store, totalUsers) - }) - - t.Run("Add 1, update 1 with valid etag", func(t *testing.T) { - toModify, toModifyETag := assertUserExists(t, store, initialUsers[userIndex].ID) - modified := toModify - modified.FavoriteBeverage = beverageTea - toInsert := user{keyGen.NextKey(), "Maria", "Wine"} - - err := store.BulkSet(context.Background(), []state.SetRequest{ - {Key: modified.ID, Value: modified, ETag: &toModifyETag}, - {Key: toInsert.ID, Value: toInsert}, - }, state.BulkStoreOpts{}) - require.NoError(t, err) - assertLoadedUserIsEqual(t, store, modified.ID, modified) - assertLoadedUserIsEqual(t, store, toInsert.ID, toInsert) - totalUsers++ - assertUserCountIsEqualTo(t, store, totalUsers) - - userIndex++ - }) - - t.Run("Add 1, update 1 without etag", func(t *testing.T) { - toModify := initialUsers[userIndex] - modified := toModify - modified.FavoriteBeverage = beverageTea - toInsert := user{keyGen.NextKey(), "Tony", "Milk"} - - err := store.BulkSet(context.Background(), []state.SetRequest{ - {Key: modified.ID, Value: modified}, - {Key: toInsert.ID, Value: toInsert}, - }, state.BulkStoreOpts{}) - require.NoError(t, err) - assertLoadedUserIsEqual(t, store, modified.ID, modified) - assertLoadedUserIsEqual(t, store, toInsert.ID, toInsert) - totalUsers++ - assertUserCountIsEqualTo(t, store, totalUsers) - - userIndex++ - }) - - t.Run("Failed upsert due to etag should be aborted", func(t *testing.T) { - toInsert1 := user{keyGen.NextKey(), "Ted1", "Beer"} - toInsert2 := user{keyGen.NextKey(), "Ted2", "Beer"} - toModify := initialUsers[userIndex] - modified := toModify - modified.FavoriteBeverage = beverageTea - - invEtag := invalidEtag - sets := []state.SetRequest{ - {Key: toInsert1.ID, Value: toInsert1}, - {Key: toInsert2.ID, Value: toInsert2}, - {Key: modified.ID, Value: modified, ETag: &invEtag}, - } - - err := store.BulkSet(context.Background(), sets, state.BulkStoreOpts{}) - assert.NotNil(t, err) - assertUserCountIsEqualTo(t, store, totalUsers) - assertUserDoesNotExist(t, store, toInsert1.ID) - assertUserDoesNotExist(t, store, toInsert2.ID) - assertLoadedUserIsEqual(t, store, modified.ID, toModify) - assertUserCountIsEqualTo(t, store, totalUsers) - }) - }) - } -} - -func testBulkDelete(t *testing.T) { - tests := []struct { - name string - kt KeyType - keyGen userKeyGenerator - }{ - {"Bulk delete string key type", StringKeyType, &numbericKeyGenerator{}}, - {"Bulk delete integer key type", IntegerKeyType, &numbericKeyGenerator{}}, - {"Bulk delete uuid key type", UUIDKeyType, &uuidKeyGenerator{}}, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - store := getTestStoreWithKeyType(t, test.kt, "") - keyGen := test.keyGen - - initialUsers := []user{ - {keyGen.NextKey(), "John", "Coffee"}, - {keyGen.NextKey(), "Laura", "Water"}, - {keyGen.NextKey(), "Carl", "Beer"}, - {keyGen.NextKey(), "Maria", "Wine"}, - {keyGen.NextKey(), "Mark", "Juice"}, - {keyGen.NextKey(), "Sara", "Soda"}, - {keyGen.NextKey(), "Tony", "Milk"}, - {keyGen.NextKey(), "Hugo", "Juice"}, - } - - sets := make([]state.SetRequest, len(initialUsers)) - for i, u := range initialUsers { - sets[i] = state.SetRequest{Key: u.ID, Value: u} - } - err := store.BulkSet(context.Background(), sets, state.BulkStoreOpts{}) - require.NoError(t, err) - totalUsers := len(initialUsers) - assertUserCountIsEqualTo(t, store, totalUsers) - - userIndex := 0 - - t.Run("Delete 2 items without etag should work", func(t *testing.T) { - deleted1 := initialUsers[userIndex].ID - deleted2 := initialUsers[userIndex+1].ID - err := store.BulkDelete(context.Background(), []state.DeleteRequest{ - {Key: deleted1}, - {Key: deleted2}, - }, state.BulkStoreOpts{}) - require.NoError(t, err) - totalUsers -= 2 - assertUserCountIsEqualTo(t, store, totalUsers) - assertUserDoesNotExist(t, store, deleted1) - assertUserDoesNotExist(t, store, deleted2) - - userIndex += 2 - }) - - t.Run("Delete 2 items with etag should work", func(t *testing.T) { - deleted1, deleted1Etag := assertUserExists(t, store, initialUsers[userIndex].ID) - deleted2, deleted2Etag := assertUserExists(t, store, initialUsers[userIndex+1].ID) - - err := store.BulkDelete(context.Background(), []state.DeleteRequest{ - {Key: deleted1.ID, ETag: &deleted1Etag}, - {Key: deleted2.ID, ETag: &deleted2Etag}, - }, state.BulkStoreOpts{}) - require.NoError(t, err) - totalUsers -= 2 - assertUserCountIsEqualTo(t, store, totalUsers) - assertUserDoesNotExist(t, store, deleted1.ID) - assertUserDoesNotExist(t, store, deleted2.ID) - - userIndex += 2 - }) - - t.Run("Delete with/without etag should work", func(t *testing.T) { - deleted1, deleted1Etag := assertUserExists(t, store, initialUsers[userIndex].ID) - deleted2 := initialUsers[userIndex+1] - - err := store.BulkDelete(context.Background(), []state.DeleteRequest{ - {Key: deleted1.ID, ETag: &deleted1Etag}, - {Key: deleted2.ID}, - }, state.BulkStoreOpts{}) - require.NoError(t, err) - totalUsers -= 2 - assertUserCountIsEqualTo(t, store, totalUsers) - assertUserDoesNotExist(t, store, deleted1.ID) - assertUserDoesNotExist(t, store, deleted2.ID) - - userIndex += 2 - }) - - t.Run("Failed delete due to etag should be aborted", func(t *testing.T) { - deleted1, deleted1Etag := assertUserExists(t, store, initialUsers[userIndex].ID) - deleted2 := initialUsers[userIndex+1] - - invEtag := invalidEtag - err := store.BulkDelete(context.Background(), []state.DeleteRequest{ - {Key: deleted1.ID, ETag: &deleted1Etag}, - {Key: deleted2.ID, ETag: &invEtag}, - }, state.BulkStoreOpts{}) - assert.NotNil(t, err) - assert.NotNil(t, err) - assertUserCountIsEqualTo(t, store, totalUsers) - assertUserExists(t, store, deleted1.ID) - assertUserExists(t, store, deleted2.ID) - }) - }) - } -} - /* #nosec. */ func testInsertAndUpdateSetRecordDates(t *testing.T) { const maxDiffInMs = float64(500) @@ -803,8 +587,10 @@ func testMultipleInitializations(t *testing.T) { store := getTestStoreWithKeyType(t, test.kt, test.indexedProperties) store2 := &SQLServer{ - logger: logger.NewLogger("test"), + logger: logger.NewLogger("test"), + migratorFactory: newMigration, } + store2.BulkStore = state.NewDefaultBulkStore(store2) err := store2.Init(context.Background(), createMetadata(store.metadata.Schema, test.kt, test.indexedProperties)) assert.NoError(t, err) }) diff --git a/tests/certification/state/sqlserver/docker-compose.yml b/tests/certification/state/sqlserver/docker-compose.yml index 951e7e578c..63510b7688 100644 --- a/tests/certification/state/sqlserver/docker-compose.yml +++ b/tests/certification/state/sqlserver/docker-compose.yml @@ -1,7 +1,7 @@ version: "3.7" services: sqlserver: - image: mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04 + image: mcr.microsoft.com/mssql/server:2019-latest ports: - "1433:1433" environment: diff --git a/tests/certification/state/sqlserver/sqlserver_test.go b/tests/certification/state/sqlserver/sqlserver_test.go index a2ab8bb1b9..196444eb1e 100644 --- a/tests/certification/state/sqlserver/sqlserver_test.go +++ b/tests/certification/state/sqlserver/sqlserver_test.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "os" + "strconv" "testing" "time" @@ -66,32 +67,33 @@ func TestSqlServer(t *testing.T) { currentHTTPPort := ports[1] basicTest := func(ctx flow.Context) error { - client, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort)) - if err != nil { - panic(err) - } - defer client.Close() - - // save state, default options: strong, last-write - err = client.SaveState(ctx, stateStoreName, certificationTestPrefix+"key1", []byte("certificationdata"), nil) - require.NoError(ctx.T, err) + ctx.T.Run("basic test", func(t *testing.T) { + client, err := client.NewClientWithPort(strconv.Itoa(currentGrpcPort)) + if err != nil { + panic(err) + } + defer client.Close() - // get state - item, err := client.GetState(ctx, stateStoreName, certificationTestPrefix+"key1", nil) - require.NoError(ctx.T, err) - assert.Equal(ctx.T, "certificationdata", string(item.Value)) + // save state, default options: strong, last-write + err = client.SaveState(ctx, stateStoreName, certificationTestPrefix+"key1", []byte("certificationdata"), nil) + require.NoError(t, err) - // delete state - err = client.DeleteState(ctx, stateStoreName, certificationTestPrefix+"key1", nil) - require.NoError(ctx.T, err) + // get state + item, err := client.GetState(ctx, stateStoreName, certificationTestPrefix+"key1", nil) + require.NoError(t, err) + assert.Equal(t, "certificationdata", string(item.Value)) + // delete state + err = client.DeleteState(ctx, stateStoreName, certificationTestPrefix+"key1", nil) + require.NoError(t, err) + }) return nil } // this test function heavily depends on the values defined in ./components/docker/customschemawithindex verifyIndexedPopertiesTest := func(ctx flow.Context) error { // verify indices were created by Dapr as specified in the component metadata - db, err := sql.Open("sqlserver", fmt.Sprintf("%sdatabase=certificationtest;", dockerConnectionString)) + db, err := sql.Open("mssql", fmt.Sprintf("%sdatabase=certificationtest;", dockerConnectionString)) require.NoError(ctx.T, err) defer db.Close() @@ -117,7 +119,7 @@ func TestSqlServer(t *testing.T) { assert.Equal(ctx.T, 3, indexFoundCount) // write JSON data to the state store (which will automatically be indexed in separate columns) - client, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort)) + client, err := client.NewClientWithPort(strconv.Itoa(currentGrpcPort)) if err != nil { panic(err) } @@ -166,7 +168,7 @@ func TestSqlServer(t *testing.T) { // helper function for testing the use of an existing custom schema createCustomSchema := func(ctx flow.Context) error { - db, err := sql.Open("sqlserver", dockerConnectionString) + db, err := sql.Open("mssql", dockerConnectionString) assert.NoError(ctx.T, err) _, err = db.Exec("CREATE SCHEMA customschema;") assert.NoError(ctx.T, err) @@ -176,7 +178,7 @@ func TestSqlServer(t *testing.T) { // helper function to insure the SQL Server Docker Container is truly ready checkSQLServerAvailability := func(ctx flow.Context) error { - db, err := sql.Open("sqlserver", dockerConnectionString) + db, err := sql.Open("mssql", dockerConnectionString) if err != nil { return err } @@ -189,7 +191,7 @@ func TestSqlServer(t *testing.T) { // checks the state store component is not vulnerable to SQL injection verifySQLInjectionTest := func(ctx flow.Context) error { - client, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort)) + client, err := client.NewClientWithPort(strconv.Itoa(currentGrpcPort)) if err != nil { panic(err) } @@ -280,7 +282,7 @@ func TestSqlServer(t *testing.T) { }) ctx.T.Run("cleanup", func(t *testing.T) { - dbClient, err := sql.Open("sqlserver", connString) + dbClient, err := sql.Open("mssql", connString) require.NoError(t, err) t.Run("automatically delete expiredate records", func(t *testing.T) {