diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 23403395a8..eff626ee5e 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -793,7 +793,7 @@ func (jd *Handle) init() { } if jd.conf.payloadColumnType == 0 { - jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(0, 1, "JobsDB."+jd.tablePrefix+"."+string(jd.ownerType)+".payloadColumnType", "JobsDB."+jd.tablePrefix+".payloadColumnType")) + jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(0, 1, "JobsDB.payloadColumnType")) } if jd.stats == nil { @@ -1083,8 +1083,59 @@ func (jd *Handle) writerSetup(ctx context.Context, l lock.LockToken) { jd.assertError(jd.doRefreshDSRangeList(l)) // If no DS present, add one - if len(jd.getDSList()) == 0 { + var createDS bool + var updateColumnType bool + dsList := jd.getDSList() + if len(dsList) == 0 { + createDS = true + } else { + // first check column type + var columnType string + err := jd.dbHandle.QueryRowContext( + ctx, + fmt.Sprintf( + `select data_type + from information_schema.columns + where table_name = '%[1]s' and column_name='event_payload';`, + dsList[len(dsList)-1].JobTable, + ), + ).Scan(&columnType) + jd.assertError(err) + jd.logger.Infow("previous column type", "type", columnType) + if columnType != string(jd.conf.payloadColumnType) { + var jobID int64 + err := jd.dbHandle.QueryRowContext( + ctx, + fmt.Sprintf(`select job_id from %q order by job_id asc limit 1`, dsList[len(dsList)-1].JobTable), + ).Scan(&jobID) + if errors.Is(err, sql.ErrNoRows) { + updateColumnType = true + } else if err == nil { + createDS = true + } else { + jd.assertError(err) + } + } + } + if createDS { jd.addNewDS(l, newDataSet(jd.tablePrefix, jd.computeNewIdxForAppend(l))) + } else if updateColumnType { + var payloadType string + switch jd.conf.payloadColumnType { + case payloadColumnType(0): + payloadType = "jsonb" + case payloadColumnType(1): + payloadType = "bytea" + case payloadColumnType(2): + payloadType = "text" + default: + jd.assertError(fmt.Errorf("invalid type: %d", jd.conf.payloadColumnType)) + } + _, err := jd.dbHandle.ExecContext( + ctx, + fmt.Sprintf(`alter table %q alter column event_payload type %s`, dsList[len(dsList)-1].JobTable, payloadType), + ) + jd.assertError(err) } jd.backgroundGroup.Go(crash.Wrapper(func() error {