diff --git a/go/test/endtoend/vreplication/reference_test.go b/go/test/endtoend/vreplication/reference_test.go index 40a8b1e39c3..2e560b22f0c 100644 --- a/go/test/endtoend/vreplication/reference_test.go +++ b/go/test/endtoend/vreplication/reference_test.go @@ -79,6 +79,7 @@ insert into cat values (3, 'c3'); insert into mfg values (1, 'm1'); insert into mfg values (2, 'm2'); insert into mfg values (3, 'm3'); +insert into mfg values (4, 'm4'); ` ) @@ -98,9 +99,12 @@ func TestReferenceTableMaterializationAndRouting(t *testing.T) { vc.AddKeyspace(t, []*Cell{defaultCell}, uks, "0", uksVSchema, uksSchema, defaultReplicas, defaultRdonly, 100, nil) vc.AddKeyspace(t, []*Cell{defaultCell}, sks, "-80,80-", sksVSchema, sksSchema, defaultReplicas, defaultRdonly, 200, nil) vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + verifyClusterHealth(t, vc) _, _, err = vtgateConn.ExecuteFetchMulti(initializeTables, 0, false) require.NoError(t, err) + vtgateConn.Close() + materialize(t, materializeCatSpec, false) materialize(t, materializeMfgSpec, false) @@ -111,21 +115,38 @@ func TestReferenceTableMaterializationAndRouting(t *testing.T) { catchup(t, tabDash80, "wfMfg", "Materialize Manufacturer") catchup(t, tab80Dash, "wfMfg", "Materialize Manufacturer") - vtgateConn.Close() vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() waitForRowCount(t, vtgateConn, sks, "cat", 3) waitForRowCount(t, vtgateConn, sks, "mfg2", 3) - insertQuery := "insert into mfg values (4, 'm4')" - _, err = vtgateConn.ExecuteFetch(insertQuery, 0, false) - require.Contains(t, err.Error(), "table mfg not found") + execRefQuery(t, "insert into mfg values (5, 'm5')") + execRefQuery(t, "insert into mfg2 values (6, 'm6')") + execRefQuery(t, "insert into uks.mfg values (7, 'm7')") + execRefQuery(t, "insert into sks.mfg2 values (8, 'm8')") + waitForRowCount(t, vtgateConn, uks, "mfg", 8) + + execRefQuery(t, "update mfg set name = concat(name, '-updated') where id = 1") + execRefQuery(t, "update mfg2 set name = concat(name, '-updated') where id = 2") + execRefQuery(t, "update uks.mfg set name = concat(name, '-updated') where id = 3") + execRefQuery(t, "update sks.mfg set name = concat(name, '-updated') where id = 4") - insertQuery = "insert into mfg2 values (4, 'm4')" - _, err = vtgateConn.ExecuteFetch(insertQuery, 0, false) - require.Contains(t, err.Error(), "Table 'vt_uks.mfg2' doesn't exist") + waitForRowCount(t, vtgateConn, uks, "mfg", 8) + qr := execVtgateQuery(t, vtgateConn, "uks", "select count(*) from uks.mfg where name like '%updated%'") + require.NotNil(t, qr) + require.Equal(t, "4", qr.Rows[0][0].ToString()) - insertQuery = "insert into uks.mfg values (4, 'm4')" - _, err = vtgateConn.ExecuteFetch(insertQuery, 0, false) + execRefQuery(t, "delete from mfg where id = 5") + execRefQuery(t, "delete from mfg2 where id = 6") + execRefQuery(t, "delete from uks.mfg where id = 7") + execRefQuery(t, "delete from sks.mfg where id = 8") + waitForRowCount(t, vtgateConn, uks, "mfg", 4) + +} + +func execRefQuery(t *testing.T, query string) { + vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + _, err := vtgateConn.ExecuteFetch(query, 0, false) require.NoError(t, err) }