diff --git a/pkg/frontend/authenticate_test.go b/pkg/frontend/authenticate_test.go index bab339be80a31..60ef68f7a21f5 100644 --- a/pkg/frontend/authenticate_test.go +++ b/pkg/frontend/authenticate_test.go @@ -11927,7 +11927,7 @@ func TestCheckTimeStampValid(t *testing.T) { func Test_getSqlForCheckDupPitrFormat(t *testing.T) { sql := getSqlForCheckDupPitrFormat(123, 456) - assert.Equal(t, "select pitr_id from mo_catalog.mo_pitr where create_account = 123 and obj_id = 456;", sql) + assert.Equal(t, "select pitr_id from mo_catalog.mo_pitr where create_account = 123 and obj_id = 456 and kind = 'user';", sql) } func Test_checkPitrDup(t *testing.T) { diff --git a/pkg/frontend/data_branch.go b/pkg/frontend/data_branch.go index f324c21d04791..914f221e700aa 100644 --- a/pkg/frontend/data_branch.go +++ b/pkg/frontend/data_branch.go @@ -26,6 +26,7 @@ import ( "sync/atomic" "time" + "github.com/google/uuid" "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/malloc" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -49,6 +50,31 @@ import ( "go.uber.org/zap" ) +const ( + dataBranchPitrNamePrefix = "__mo_data_branch_pitr" + dataBranchPitrLength = 1 + dataBranchPitrUnit = "y" + dataBranchPitrKind = "internal" +) + +type dataBranchPitrRecord struct { + pitrName string + level string + accountID uint64 + accountName string + databaseName string + tableName string + objectID uint64 +} + +type dataBranchExistingPitrRecord struct { + exists bool + kind string + active bool + pitrLength uint64 + pitrUnit string +} + func newBranchHashmapAllocator(limitRate float64) *branchHashmapAllocator { throttler := rscthrottler.NewMemThrottler( "DataBranchHashmap", @@ -352,6 +378,10 @@ func dataBranchCreateTable( return } + if err = createDataBranchTablePitr(execCtx.reqCtx, ses, bh, receipt); err != nil { + return + } + return nil } @@ -394,9 +424,789 @@ func dataBranchCreateDatabase( } } + if err = createDataBranchDatabasePitr(execCtx.reqCtx, ses, bh, &stmt.CloneDatabase, receipts); err != nil { + return + } + return nil } +func dataBranchPitrName(level string, objectID string) string { + return fmt.Sprintf("%s_%s_%s", dataBranchPitrNamePrefix, level, objectID) +} + +func quoteSQLIdentifier(s string) string { + return "`" + strings.ReplaceAll(s, "`", "``") + "`" +} + +func dataBranchCloneTxn(ctx context.Context, bh BackgroundExec) (TxnOperator, error) { + be, ok := bh.(*backExec) + if !ok || be.backSes == nil || be.backSes.GetTxnHandler() == nil { + return nil, moerr.NewInternalError(ctx, "data branch pitr requires background transaction") + } + return be.backSes.GetTxnHandler().GetTxn(), nil +} + +func createDataBranchTablePitr( + ctx context.Context, + ses *Session, + bh BackgroundExec, + receipt cloneReceipt, +) error { + if receipt.dstDb == "" || receipt.dstTbl == "" { + return moerr.NewInternalError(ctx, "data branch table pitr requires target table") + } + + cloneTxnOp, err := dataBranchCloneTxn(ctx, bh) + if err != nil { + return err + } + if err = createDataBranchTablePitrForTarget( + ctx, ses, bh, cloneTxnOp, receipt.toAccount, receipt.dstDb, receipt.dstTbl, "target", + ); err != nil { + return err + } + if receipt.srcDb == "" || receipt.srcTbl == "" { + return nil + } + return createDataBranchTablePitrForTarget( + ctx, ses, bh, cloneTxnOp, receipt.srcAccount, receipt.srcDb, receipt.srcTbl, "source", + ) +} + +func createDataBranchDatabasePitr( + ctx context.Context, + ses *Session, + bh BackgroundExec, + stmt *tree.CloneDatabase, + receipts []cloneReceipt, +) error { + if stmt == nil { + return moerr.NewInternalError(ctx, "data branch database pitr requires clone database statement") + } + + var ( + toAccountID uint32 + srcAccountID uint32 + err error + ) + if len(receipts) > 0 { + toAccountID = receipts[0].toAccount + srcAccountID = receipts[0].srcAccount + } else { + var snapshot *plan2.Snapshot + srcAccountID, toAccountID, snapshot, err = getOpAndToAccountId( + ctx, ses, bh, stmt.ToAccountOpt, stmt.AtTsExpr, + ) + if err != nil { + return err + } + if snapshot != nil && snapshot.Tenant != nil { + srcAccountID = snapshot.Tenant.TenantID + } + } + + dstDb := stmt.DstDatabase.String() + if dstDb == "" { + return moerr.NewInternalError(ctx, "data branch database pitr requires target database") + } + cloneTxnOp, err := dataBranchCloneTxn(ctx, bh) + if err != nil { + return err + } + if err = createDataBranchDatabasePitrForTarget( + ctx, ses, bh, cloneTxnOp, toAccountID, dstDb, "target", + ); err != nil { + return err + } + + srcDb := stmt.SrcDatabase.String() + if len(receipts) > 0 && receipts[0].srcDb != "" { + srcDb = receipts[0].srcDb + } + if srcDb == "" { + return nil + } + return createDataBranchDatabasePitrForTarget( + ctx, ses, bh, cloneTxnOp, srcAccountID, srcDb, "source", + ) +} + +func createDataBranchTablePitrForTarget( + ctx context.Context, + ses *Session, + bh BackgroundExec, + cloneTxnOp TxnOperator, + accountID uint32, + dbName string, + tableName string, + role string, +) error { + tableID, err := dataBranchTableID(ctx, ses, cloneTxnOp, accountID, dbName, tableName) + if err != nil { + return err + } + pitrName := dataBranchPitrName("table", strconv.FormatUint(tableID, 10)) + accountName, err := dataBranchPitrAccountName(ctx, ses, bh, accountID) + if err != nil { + return err + } + logutil.Info( + "DataBranch-CreatePITR-Table", + zap.String("role", role), + zap.String("pitr-name", pitrName), + zap.Uint64("table-id", tableID), + zap.Uint32("account-id", accountID), + zap.String("database", dbName), + zap.String("table", tableName), + ) + return createDataBranchPitrRecord(ctx, ses, bh, cloneTxnOp, dataBranchPitrRecord{ + pitrName: pitrName, + level: tree.PITRLEVELTABLE.String(), + accountID: uint64(accountID), + accountName: accountName, + databaseName: dbName, + tableName: tableName, + objectID: tableID, + }) +} + +func createDataBranchDatabasePitrForTarget( + ctx context.Context, + ses *Session, + bh BackgroundExec, + cloneTxnOp TxnOperator, + accountID uint32, + dbName string, + role string, +) error { + databaseID, err := dataBranchDatabaseID(ctx, ses, cloneTxnOp, accountID, dbName) + if err != nil { + return err + } + pitrName := dataBranchPitrName("database", strconv.FormatUint(databaseID, 10)) + accountName, err := dataBranchPitrAccountName(ctx, ses, bh, accountID) + if err != nil { + return err + } + logutil.Info( + "DataBranch-CreatePITR-Database", + zap.String("role", role), + zap.String("pitr-name", pitrName), + zap.Uint64("database-id", databaseID), + zap.Uint32("account-id", accountID), + zap.String("database", dbName), + ) + return createDataBranchPitrRecord(ctx, ses, bh, cloneTxnOp, dataBranchPitrRecord{ + pitrName: pitrName, + level: tree.PITRLEVELDATABASE.String(), + accountID: uint64(accountID), + accountName: accountName, + databaseName: dbName, + objectID: databaseID, + }) +} + +func dataBranchDatabaseID( + ctx context.Context, + ses *Session, + cloneTxnOp TxnOperator, + accountID uint32, + dbName string, +) (uint64, error) { + dbCtx := defines.AttachAccountId(ctx, accountID) + db, err := ses.proc.GetSessionInfo().StorageEngine.Database(dbCtx, dbName, cloneTxnOp) + if err != nil { + return 0, err + } + return strconv.ParseUint(db.GetDatabaseId(dbCtx), 10, 64) +} + +func dataBranchTableID( + ctx context.Context, + ses *Session, + cloneTxnOp TxnOperator, + accountID uint32, + dbName string, + tableName string, +) (uint64, error) { + dbCtx := defines.AttachAccountId(ctx, accountID) + db, err := ses.proc.GetSessionInfo().StorageEngine.Database(dbCtx, dbName, cloneTxnOp) + if err != nil { + return 0, err + } + rel, err := db.Relation(dbCtx, tableName, nil) + if err != nil { + return 0, err + } + return rel.GetTableDef(dbCtx).TblId, nil +} + +func dataBranchPitrAccountName( + ctx context.Context, + ses *Session, + bh BackgroundExec, + accountID uint32, +) (string, error) { + if accountID == sysAccountID { + return sysAccountName, nil + } + if tenantInfo := ses.GetTenantInfo(); tenantInfo != nil && accountID == tenantInfo.GetTenantID() { + return tenantInfo.GetTenant(), nil + } + + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + sql := fmt.Sprintf( + "select account_name from mo_catalog.mo_account where account_id = %d", + accountID, + ) + bh.ClearExecResultSet() + if err := bh.Exec(sysCtx, sql); err != nil { + return "", err + } + erArray, err := getResultSet(sysCtx, bh) + if err != nil { + return "", err + } + if !execResultArrayHasData(erArray) { + return "", moerr.NewInternalErrorf(ctx, "account %d does not exist", accountID) + } + return erArray[0].GetString(sysCtx, 0, 0) +} + +func createDataBranchPitrRecord( + ctx context.Context, + ses *Session, + bh BackgroundExec, + cloneTxnOp TxnOperator, + record dataBranchPitrRecord, +) error { + now := cloneTxnOp.SnapshotTS().ToStdTime().UTC().UnixNano() + existing, err := getDataBranchPitrRecord(ctx, bh, record.pitrName, record.accountID) + if err != nil { + return err + } + if existing.exists { + if err = ensureDataBranchPitrRecord(ctx, bh, now, record, existing); err != nil { + return err + } + return ensureDataBranchMoCatalogPitr( + ctx, ses, bh, cloneTxnOp, now, + ) + } + + pitrID, err := uuid.NewV7() + if err != nil { + return err + } + sql, err := getSqlForCreateDataBranchPitrRecord(ctx, pitrID.String(), now, record) + if err != nil { + return err + } + + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + if err = bh.Exec(sysCtx, sql); err != nil { + // Concurrent branch creation can win the primary key race. Re-read the + // record and normalize it if it is the expected internal PITR. + existing, readErr := getDataBranchPitrRecord(ctx, bh, record.pitrName, record.accountID) + if readErr == nil && existing.exists && existing.kind == dataBranchPitrKind { + if ensureErr := ensureDataBranchPitrRecord(ctx, bh, now, record, existing); ensureErr != nil { + return ensureErr + } + return ensureDataBranchMoCatalogPitr(ctx, ses, bh, cloneTxnOp, now) + } + return err + } + return ensureDataBranchMoCatalogPitr(ctx, ses, bh, cloneTxnOp, now) +} + +func getDataBranchPitrRecord( + ctx context.Context, + bh BackgroundExec, + pitrName string, + accountID uint64, +) (dataBranchExistingPitrRecord, error) { + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + sql := fmt.Sprintf( + "select kind, pitr_status, pitr_length, pitr_unit from mo_catalog.mo_pitr where pitr_name = %s and create_account = %d", + quoteSQLStringLiteral(pitrName), + accountID, + ) + bh.ClearExecResultSet() + if err := bh.Exec(sysCtx, sql); err != nil { + return dataBranchExistingPitrRecord{}, err + } + erArray, err := getResultSet(sysCtx, bh) + if err != nil { + return dataBranchExistingPitrRecord{}, err + } + if !execResultArrayHasData(erArray) { + return dataBranchExistingPitrRecord{}, nil + } + kind, err := erArray[0].GetString(sysCtx, 0, 0) + if err != nil { + return dataBranchExistingPitrRecord{}, err + } + status, err := erArray[0].GetUint64(sysCtx, 0, 1) + if err != nil { + return dataBranchExistingPitrRecord{}, err + } + pitrLength, err := erArray[0].GetUint64(sysCtx, 0, 2) + if err != nil { + return dataBranchExistingPitrRecord{}, err + } + pitrUnit, err := erArray[0].GetString(sysCtx, 0, 3) + if err != nil { + return dataBranchExistingPitrRecord{}, err + } + return dataBranchExistingPitrRecord{ + exists: true, + kind: kind, + active: status == 1, + pitrLength: pitrLength, + pitrUnit: pitrUnit, + }, nil +} + +func ensureDataBranchPitrRecord( + ctx context.Context, + bh BackgroundExec, + now int64, + record dataBranchPitrRecord, + existing dataBranchExistingPitrRecord, +) error { + setParts := []string{ + fmt.Sprintf("modified_time = %d", now), + fmt.Sprintf("level = %s", quoteSQLStringLiteral(record.level)), + fmt.Sprintf("account_id = %d", record.accountID), + fmt.Sprintf("account_name = %s", quoteSQLStringLiteral(record.accountName)), + fmt.Sprintf("database_name = %s", quoteSQLStringLiteral(record.databaseName)), + fmt.Sprintf("table_name = %s", quoteSQLStringLiteral(record.tableName)), + fmt.Sprintf("obj_id = %d", record.objectID), + "pitr_status = 1", + fmt.Sprintf("kind = %s", quoteSQLStringLiteral(dataBranchPitrKind)), + } + if !existing.active { + setParts = append(setParts, fmt.Sprintf("pitr_status_changed_time = %d", now)) + } + needDurationUpdate, err := dataBranchPitrDurationNeedsUpdate(existing.pitrLength, existing.pitrUnit) + if err != nil { + return err + } + if needDurationUpdate { + setParts = append(setParts, + fmt.Sprintf("pitr_length = %d", dataBranchPitrLength), + fmt.Sprintf("pitr_unit = %s", quoteSQLStringLiteral(dataBranchPitrUnit)), + ) + } + + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + sql := fmt.Sprintf( + "update mo_catalog.mo_pitr set %s where pitr_name = %s and create_account = %d", + strings.Join(setParts, ", "), + quoteSQLStringLiteral(record.pitrName), + record.accountID, + ) + return bh.Exec(sysCtx, sql) +} + +func dataBranchPitrDurationNeedsUpdate(oldLength uint64, oldUnit string) (bool, error) { + oldMinTs, err := addTimeSpan(time.Time{}, int(oldLength), oldUnit) + if err != nil { + return false, err + } + newMinTs, err := addTimeSpan(time.Time{}, dataBranchPitrLength, dataBranchPitrUnit) + if err != nil { + return false, err + } + return newMinTs.Before(oldMinTs), nil +} + +func getSqlForCreateDataBranchPitrRecord( + ctx context.Context, + pitrID string, + now int64, + record dataBranchPitrRecord, +) (string, error) { + if err := inputNameIsInvalid(ctx, record.pitrName); err != nil { + return "", err + } + return fmt.Sprintf(`insert into mo_catalog.mo_pitr( + pitr_id, + pitr_name, + create_account, + create_time, + modified_time, + level, + account_id, + account_name, + database_name, + table_name, + obj_id, + pitr_length, + pitr_unit, + pitr_status_changed_time, + kind) values (%s, %s, %d, %d, %d, %s, %d, %s, %s, %s, %d, %d, %s, %d, %s);`, + quoteSQLStringLiteral(pitrID), + quoteSQLStringLiteral(record.pitrName), + record.accountID, + now, + now, + quoteSQLStringLiteral(record.level), + record.accountID, + quoteSQLStringLiteral(record.accountName), + quoteSQLStringLiteral(record.databaseName), + quoteSQLStringLiteral(record.tableName), + record.objectID, + dataBranchPitrLength, + quoteSQLStringLiteral(dataBranchPitrUnit), + now, + quoteSQLStringLiteral(dataBranchPitrKind), + ), nil +} + +func ensureDataBranchMoCatalogPitr( + ctx context.Context, + ses *Session, + bh BackgroundExec, + cloneTxnOp TxnOperator, + now int64, +) error { + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + sql := fmt.Sprintf( + "select pitr_length, pitr_unit from mo_catalog.mo_pitr where pitr_name = %s", + quoteSQLStringLiteral(SYSMOCATALOGPITR), + ) + bh.ClearExecResultSet() + if err := bh.Exec(sysCtx, sql); err != nil { + return err + } + erArray, err := getResultSet(sysCtx, bh) + if err != nil { + return err + } + + if execResultArrayHasData(erArray) { + sysPitrLength, err := erArray[0].GetUint64(sysCtx, 0, 0) + if err != nil { + return err + } + sysPitrUnit, err := erArray[0].GetString(sysCtx, 0, 1) + if err != nil { + return err + } + oldMinTs, err := addTimeSpan(time.Time{}, int(sysPitrLength), sysPitrUnit) + if err != nil { + return err + } + newMinTs, err := addTimeSpan(time.Time{}, dataBranchPitrLength, dataBranchPitrUnit) + if err != nil { + return err + } + setParts := []string{ + fmt.Sprintf("modified_time = %d", now), + "pitr_status = 1", + fmt.Sprintf("kind = %s", quoteSQLStringLiteral(dataBranchPitrKind)), + } + if !newMinTs.Before(oldMinTs) { + sql = fmt.Sprintf( + "update mo_catalog.mo_pitr set %s where pitr_name = %s and create_account = %d", + strings.Join(setParts, ", "), + quoteSQLStringLiteral(SYSMOCATALOGPITR), + sysAccountID, + ) + return bh.Exec(sysCtx, sql) + } + setParts = append(setParts, + fmt.Sprintf("pitr_length = %d", dataBranchPitrLength), + fmt.Sprintf("pitr_unit = %s", quoteSQLStringLiteral(dataBranchPitrUnit)), + ) + sql = fmt.Sprintf( + "update mo_catalog.mo_pitr set %s where pitr_name = %s and create_account = %d", + strings.Join(setParts, ", "), + quoteSQLStringLiteral(SYSMOCATALOGPITR), + sysAccountID, + ) + return bh.Exec(sysCtx, sql) + } + + moCatalogDB, err := ses.proc.GetSessionInfo().StorageEngine.Database( + sysCtx, catalog.MO_CATALOG, cloneTxnOp, + ) + if err != nil { + return err + } + moCatalogID, err := strconv.ParseUint(moCatalogDB.GetDatabaseId(sysCtx), 10, 64) + if err != nil { + return err + } + pitrID, err := uuid.NewV7() + if err != nil { + return err + } + sql, err = getSqlForCreateDataBranchPitrRecord(ctx, pitrID.String(), now, dataBranchPitrRecord{ + pitrName: SYSMOCATALOGPITR, + level: tree.PITRLEVELDATABASE.String(), + accountID: sysAccountID, + accountName: sysAccountName, + databaseName: catalog.MO_CATALOG, + objectID: moCatalogID, + }) + if err != nil { + return err + } + return bh.Exec(sysCtx, sql) +} + +func deleteDataBranchInternalPitrRecord( + ctx context.Context, + bh BackgroundExec, + accountID uint64, + pitrName string, +) error { + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + sql := fmt.Sprintf( + "delete from mo_catalog.mo_pitr where pitr_name = %s and create_account = %d", + quoteSQLStringLiteral(pitrName), + accountID, + ) + if err := bh.Exec(sysCtx, sql); err != nil { + return err + } + return cleanupDataBranchMoCatalogPitrIfUnused(ctx, bh) +} + +func cleanupDataBranchMoCatalogPitrIfUnused(ctx context.Context, bh BackgroundExec) error { + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + sql := fmt.Sprintf( + "select pitr_id from mo_catalog.mo_pitr where pitr_name != %s limit 1", + quoteSQLStringLiteral(SYSMOCATALOGPITR), + ) + bh.ClearExecResultSet() + if err := bh.Exec(sysCtx, sql); err != nil { + return err + } + erArray, err := getResultSet(sysCtx, bh) + if err != nil { + return err + } + if execResultArrayHasData(erArray) { + return nil + } + sql = fmt.Sprintf( + "delete from mo_catalog.mo_pitr where pitr_name = %s and create_account = %d and kind = %s", + quoteSQLStringLiteral(SYSMOCATALOGPITR), + sysAccountID, + quoteSQLStringLiteral(dataBranchPitrKind), + ) + return bh.Exec(sysCtx, sql) +} + +func cleanupDataBranchTablePitrIfUnused( + ctx context.Context, + bh BackgroundExec, + accountID uint64, + tableID uint64, +) error { + hasRef, err := dataBranchTablesHaveActiveReference(ctx, bh, []uint64{tableID}) + if err != nil { + return err + } + if hasRef { + return nil + } + return deleteDataBranchInternalPitrRecord( + ctx, + bh, + accountID, + dataBranchPitrName("table", strconv.FormatUint(tableID, 10)), + ) +} + +type dataBranchDatabasePitrForCleanup struct { + accountID uint64 + databaseID uint64 + database string + createTime int64 +} + +func getDataBranchDatabasePitrForCleanup( + ctx context.Context, + bh BackgroundExec, + accountID uint64, + databaseID uint64, +) (dataBranchDatabasePitrForCleanup, bool, error) { + if databaseID == 0 { + return dataBranchDatabasePitrForCleanup{}, false, nil + } + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + pitrName := dataBranchPitrName("database", strconv.FormatUint(databaseID, 10)) + sql := fmt.Sprintf( + "select create_account, obj_id, database_name, create_time from mo_catalog.mo_pitr where create_account = %d and pitr_name = %s and level = %s limit 1", + accountID, + quoteSQLStringLiteral(pitrName), + quoteSQLStringLiteral(tree.PITRLEVELDATABASE.String()), + ) + bh.ClearExecResultSet() + if err := bh.Exec(sysCtx, sql); err != nil { + return dataBranchDatabasePitrForCleanup{}, false, err + } + erArray, err := getResultSet(sysCtx, bh) + if err != nil { + return dataBranchDatabasePitrForCleanup{}, false, err + } + if !execResultArrayHasData(erArray) { + return dataBranchDatabasePitrForCleanup{}, false, nil + } + + pitrAccountID, err := erArray[0].GetUint64(sysCtx, 0, 0) + if err != nil { + return dataBranchDatabasePitrForCleanup{}, false, err + } + pitrDatabaseID, err := erArray[0].GetUint64(sysCtx, 0, 1) + if err != nil { + return dataBranchDatabasePitrForCleanup{}, false, err + } + database, err := erArray[0].GetString(sysCtx, 0, 2) + if err != nil { + return dataBranchDatabasePitrForCleanup{}, false, err + } + createTime, err := erArray[0].GetInt64(sysCtx, 0, 3) + if err != nil { + return dataBranchDatabasePitrForCleanup{}, false, err + } + return dataBranchDatabasePitrForCleanup{ + accountID: pitrAccountID, + databaseID: pitrDatabaseID, + database: database, + createTime: createTime, + }, true, nil +} + +func cleanupDataBranchDatabasePitrByIDIfUnused( + ctx context.Context, + bh BackgroundExec, + accountID uint64, + databaseID uint64, +) error { + pitr, found, err := getDataBranchDatabasePitrForCleanup(ctx, bh, accountID, databaseID) + if err != nil || !found { + return err + } + tableIDs, err := dataBranchDatabaseTableIDsAt(ctx, bh, pitr.accountID, pitr.database, pitr.createTime) + if err != nil { + return err + } + return cleanupDataBranchDatabasePitrIfUnused(ctx, bh, pitr.accountID, pitr.databaseID, tableIDs) +} + +func cleanupDataBranchDatabasePitrIfUnused( + ctx context.Context, + bh BackgroundExec, + accountID uint64, + databaseID uint64, + tableIDs []uint64, +) error { + if len(tableIDs) > 0 { + hasRef, err := dataBranchTablesHaveActiveReference(ctx, bh, tableIDs) + if err != nil { + return err + } + if hasRef { + return nil + } + } + return deleteDataBranchInternalPitrRecord( + ctx, + bh, + accountID, + dataBranchPitrName("database", strconv.FormatUint(databaseID, 10)), + ) +} + +func dataBranchDatabaseTableIDsAt( + ctx context.Context, + bh BackgroundExec, + accountID uint64, + databaseName string, + ts int64, +) ([]uint64, error) { + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + sql := fmt.Sprintf( + "select rel_id from mo_catalog.mo_tables {MO_TS = %d} where account_id = %d and reldatabase = %s", + ts, + accountID, + quoteSQLStringLiteral(databaseName), + ) + bh.ClearExecResultSet() + if err := bh.Exec(sysCtx, sql); err != nil { + return nil, err + } + erArray, err := getResultSet(sysCtx, bh) + if err != nil { + return nil, err + } + if !execResultArrayHasData(erArray) { + return nil, nil + } + tableIDs := make([]uint64, 0, erArray[0].GetRowCount()) + for i := uint64(0); i < erArray[0].GetRowCount(); i++ { + tableID, err := erArray[0].GetUint64(sysCtx, i, 0) + if err != nil { + return nil, err + } + tableIDs = append(tableIDs, tableID) + } + return tableIDs, nil +} + +func dataBranchTablesHaveActiveReference( + ctx context.Context, + bh BackgroundExec, + tableIDs []uint64, +) (bool, error) { + if len(tableIDs) == 0 { + return false, nil + } + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + const batchSize = 512 + for start := 0; start < len(tableIDs); start += batchSize { + end := start + batchSize + if end > len(tableIDs) { + end = len(tableIDs) + } + idList := dataBranchUintListSQL(tableIDs[start:end]) + sql := fmt.Sprintf( + "select table_id from mo_catalog.mo_branch_metadata where table_deleted = false and (table_id in (%s) or p_table_id in (%s)) limit 1", + idList, + idList, + ) + bh.ClearExecResultSet() + if err := bh.Exec(sysCtx, sql); err != nil { + return false, err + } + erArray, err := getResultSet(sysCtx, bh) + if err != nil { + return false, err + } + if execResultArrayHasData(erArray) { + return true, nil + } + } + return false, nil +} + +func dataBranchUintListSQL(ids []uint64) string { + var sqlBuilder strings.Builder + for i, id := range ids { + if i > 0 { + sqlBuilder.WriteByte(',') + } + sqlBuilder.WriteString(strconv.FormatUint(id, 10)) + } + return sqlBuilder.String() +} + func markBranchTablesDeleted( ctx context.Context, ses *Session, @@ -468,6 +1278,7 @@ func dataBranchDeleteTable( accId uint32 sqlRet executor.Result tblID uint64 + dbID uint64 found bool ) @@ -481,8 +1292,12 @@ func dataBranchDeleteTable( if sqlRet, err = runSql( execCtx.reqCtx, ses, bh, fmt.Sprintf( - "select rel_id from %s.%s where account_id = %d and reldatabase = '%s' and relname = '%s'", - catalog.MO_CATALOG, catalog.MO_TABLES, accId, dbName, tblName, + "select rel_id, reldatabase_id from %s.%s where account_id = %d and reldatabase = %s and relname = %s", + catalog.MO_CATALOG, + catalog.MO_TABLES, + accId, + quoteSQLStringLiteral(string(dbName)), + quoteSQLStringLiteral(string(tblName)), ), nil, nil, ); err != nil { return @@ -493,6 +1308,7 @@ func dataBranchDeleteTable( return false } tblID = vector.GetFixedAtWithTypeCheck[uint64](cols[0], 0) + dbID = vector.GetFixedAtWithTypeCheck[uint64](cols[1], 0) found = true return false }) @@ -504,7 +1320,11 @@ func dataBranchDeleteTable( dropRet.Close() }() - dropSQL := fmt.Sprintf("drop table if exists `%s`.`%s`", dbName, tblName) + dropSQL := fmt.Sprintf( + "drop table if exists %s.%s", + quoteSQLIdentifier(string(dbName)), + quoteSQLIdentifier(string(tblName)), + ) if dropRet, err = runSql(execCtx.reqCtx, ses, bh, dropSQL, nil, nil); err != nil { return } @@ -518,6 +1338,13 @@ func dataBranchDeleteTable( return } + if err = cleanupDataBranchTablePitrIfUnused(execCtx.reqCtx, bh, uint64(accId), tblID); err != nil { + return + } + if err = cleanupDataBranchDatabasePitrByIDIfUnused(execCtx.reqCtx, bh, uint64(accId), dbID); err != nil { + return + } + return nil } @@ -545,6 +1372,8 @@ func dataBranchDeleteDatabase( dbName = stmt.DatabaseName accId uint32 sqlRet executor.Result + dbID uint64 + dbFound bool tableIDs []uint64 ) @@ -554,8 +1383,33 @@ func dataBranchDeleteDatabase( if sqlRet, err = runSql( execCtx.reqCtx, ses, bh, fmt.Sprintf( - "select rel_id from %s.%s where account_id = %d and reldatabase = '%s'", - catalog.MO_CATALOG, catalog.MO_TABLES, accId, dbName, + "select dat_id from %s.%s where account_id = %d and datname = %s", + catalog.MO_CATALOG, + catalog.MO_DATABASE, + accId, + quoteSQLStringLiteral(string(dbName)), + ), nil, nil, + ); err != nil { + return + } + + sqlRet.ReadRows(func(rows int, cols []*vector.Vector) bool { + if rows == 0 { + return false + } + dbID = vector.GetFixedAtWithTypeCheck[uint64](cols[0], 0) + dbFound = true + return false + }) + sqlRet.Close() + + if sqlRet, err = runSql( + execCtx.reqCtx, ses, bh, fmt.Sprintf( + "select rel_id from %s.%s where account_id = %d and reldatabase = %s", + catalog.MO_CATALOG, + catalog.MO_TABLES, + accId, + quoteSQLStringLiteral(string(dbName)), ), nil, nil, ); err != nil { return @@ -576,7 +1430,7 @@ func dataBranchDeleteDatabase( dropRet.Close() }() - dropSQL := fmt.Sprintf("drop database if exists `%s`", dbName) + dropSQL := fmt.Sprintf("drop database if exists %s", quoteSQLIdentifier(string(dbName))) if dropRet, err = runSql(execCtx.reqCtx, ses, bh, dropSQL, nil, nil); err != nil { return } @@ -586,6 +1440,17 @@ func dataBranchDeleteDatabase( return } + for _, tableID := range tableIDs { + if err = cleanupDataBranchTablePitrIfUnused(execCtx.reqCtx, bh, uint64(accId), tableID); err != nil { + return + } + } + if dbFound { + if err = cleanupDataBranchDatabasePitrByIDIfUnused(execCtx.reqCtx, bh, uint64(accId), dbID); err != nil { + return + } + } + return nil } diff --git a/pkg/frontend/data_branch_helpers_test.go b/pkg/frontend/data_branch_helpers_test.go index e25e388ce3ef0..974964f6e1326 100644 --- a/pkg/frontend/data_branch_helpers_test.go +++ b/pkg/frontend/data_branch_helpers_test.go @@ -83,6 +83,11 @@ func TestContainsDataBranchTempTableName(t *testing.T) { require.False(t, containsDataBranchTempTableName("select '__mo_diff_del_merge_1'")) } +func TestQuoteSQLIdentifier(t *testing.T) { + require.Equal(t, "`plain`", quoteSQLIdentifier("plain")) + require.Equal(t, "`a``b`", quoteSQLIdentifier("a`b")) +} + func TestRunSQL_BackgroundExecPaths(t *testing.T) { ses := newValidateSession(t) diff --git a/pkg/frontend/pitr.go b/pkg/frontend/pitr.go index a48a71cdb10f7..8ca1d943481c4 100644 --- a/pkg/frontend/pitr.go +++ b/pkg/frontend/pitr.go @@ -54,15 +54,15 @@ var ( pitr_unit, pitr_status_changed_time) values ('%s', '%s', %d, %d, %d, '%s', %d, '%s', '%s', '%s', %d, %d, '%s', %d);` - checkPitrFormat = `select pitr_id from mo_catalog.mo_pitr where pitr_name = "%s" and create_account = %d order by pitr_id;` + checkPitrFormat = `select pitr_id from mo_catalog.mo_pitr where pitr_name = "%s" and create_account = %d and kind = 'user' order by pitr_id;` - dropPitrFormat = `delete from mo_catalog.mo_pitr where pitr_name = '%s' and create_account = %d order by pitr_id;` + dropPitrFormat = `delete from mo_catalog.mo_pitr where pitr_name = '%s' and create_account = %d and kind = 'user' order by pitr_id;` - alterPitrFormat = `update mo_catalog.mo_pitr set modified_time = %d, pitr_length = %d, pitr_unit = '%s' where pitr_name = '%s' and create_account = %d;` + alterPitrFormat = `update mo_catalog.mo_pitr set modified_time = %d, pitr_length = %d, pitr_unit = '%s' where pitr_name = '%s' and create_account = %d and kind = 'user';` getPitrFormat = `select * from mo_catalog.mo_pitr` - checkDupPitrFormat = `select pitr_id from mo_catalog.mo_pitr where create_account = %d and obj_id = %d;` + checkDupPitrFormat = `select pitr_id from mo_catalog.mo_pitr where create_account = %d and obj_id = %d and kind = 'user';` getSqlForCheckDatabaseFmt = `select dat_id from mo_catalog.mo_database {MO_TS = %d} where datname = '%s';` @@ -73,9 +73,9 @@ var ( getPubInfoWithPitrFormat = `select pub_name, database_name, database_id, table_list, account_list, created_time, update_time, owner, creator, comment from mo_catalog.mo_pubs {MO_TS = %d} where account_id = %d and database_name = '%s';` // update mo_pitr object id - updateMoPitrAccountObjectIdFmt = `update mo_catalog.mo_pitr set pitr_status = 0, pitr_status_changed_time = %d where account_name = '%s' and pitr_status = 1 and obj_id = %d;` + updateMoPitrAccountObjectIdFmt = `update mo_catalog.mo_pitr set pitr_status = 0, pitr_status_changed_time = %d where account_name = '%s' and pitr_status = 1 and obj_id = %d and kind = 'user';` - getLengthAndUnitFmt = `select pitr_length, pitr_unit from mo_catalog.mo_pitr where account_id = %d and level = '%s'` + getLengthAndUnitFmt = `select pitr_length, pitr_unit from mo_catalog.mo_pitr where account_id = %d and level = '%s' and pitr_status = 1 and kind = 'user'` ) type pitrRecord struct { @@ -98,6 +98,12 @@ const ( SYSMOCATALOGPITR = "sys_mo_catalog_pitr" ) +func isReservedPitrName(pitrName string) bool { + return pitrName == SYSMOCATALOGPITR || + pitrName == dataBranchPitrNamePrefix || + strings.HasPrefix(pitrName, dataBranchPitrNamePrefix+"_") +} + func getSqlForCreatePitr( ctx context.Context, pitrId, pitrName string, @@ -232,14 +238,14 @@ func getSqlForCheckPitrDup(createAccount string, createAccountId uint64, stmt *t return getSqlForCheckDupPitrFormat(createAccountId, math.MaxUint64) case tree.PITRLEVELACCOUNT: if len(stmt.AccountName) > 0 { - return fmt.Sprintf(sql, createAccountId) + fmt.Sprintf(" and account_name = '%s' and level = 'account' and pitr_status = 1;", stmt.AccountName) + return fmt.Sprintf(sql, createAccountId) + fmt.Sprintf(" and account_name = '%s' and level = 'account' and pitr_status = 1 and kind = 'user';", stmt.AccountName) } else { - return fmt.Sprintf(sql, createAccountId) + fmt.Sprintf(" and account_name = '%s' and level = 'account' and pitr_status = 1;", createAccount) + return fmt.Sprintf(sql, createAccountId) + fmt.Sprintf(" and account_name = '%s' and level = 'account' and pitr_status = 1 and kind = 'user';", createAccount) } case tree.PITRLEVELDATABASE: - return fmt.Sprintf(sql, createAccountId) + fmt.Sprintf(" and database_name = '%s' and level = 'database' and pitr_status = 1;", stmt.DatabaseName) + return fmt.Sprintf(sql, createAccountId) + fmt.Sprintf(" and database_name = '%s' and level = 'database' and pitr_status = 1 and kind = 'user';", stmt.DatabaseName) case tree.PITRLEVELTABLE: - return fmt.Sprintf(sql, createAccountId) + fmt.Sprintf(" and database_name = '%s' and table_name = '%s' and level = 'table' and pitr_status = 1;", stmt.DatabaseName, stmt.TableName) + return fmt.Sprintf(sql, createAccountId) + fmt.Sprintf(" and database_name = '%s' and table_name = '%s' and level = 'table' and pitr_status = 1 and kind = 'user';", stmt.DatabaseName, stmt.TableName) } return sql } @@ -338,7 +344,7 @@ func doCreatePitr(ctx context.Context, ses *Session, stmt *tree.CreatePitr) erro // 6.check pitr exists or not pitrName = string(stmt.Name) - if pitrName == SYSMOCATALOGPITR { + if isReservedPitrName(pitrName) { return moerr.NewInternalError(ctx, "pitr name is reserved") } @@ -775,6 +781,16 @@ func doDropPitr(ctx context.Context, ses *Session, stmt *tree.DropPitr) (err err return err } + // check pitr exists or not + tenantInfo := ses.GetTenantInfo() + if isReservedPitrName(string(stmt.Name)) { + // Reserved PITRs are maintained internally; IF EXISTS keeps cleanup SQL idempotent. + if stmt.IfExists { + return nil + } + return moerr.NewInternalError(ctx, "pitr name is reserved") + } + err = bh.Exec(ctx, "begin;") defer func() { err = finishTxn(ctx, bh, err) @@ -782,9 +798,6 @@ func doDropPitr(ctx context.Context, ses *Session, stmt *tree.DropPitr) (err err if err != nil { return err } - - // check pitr exists or not - tenantInfo := ses.GetTenantInfo() pitrExist, err = checkPitrExistOrNot(ctx, bh, string(stmt.Name), uint64(tenantInfo.GetTenantID())) if err != nil { return err @@ -875,6 +888,9 @@ func doAlterPitr(ctx context.Context, ses *Session, stmt *tree.AlterPitr) (err e // check pitr exists or not tenantInfo := ses.GetTenantInfo() + if isReservedPitrName(string(stmt.Name)) { + return moerr.NewInternalError(ctx, "pitr name is reserved") + } pitrExist, err = checkPitrExistOrNot(ctx, bh, string(stmt.Name), uint64(tenantInfo.GetTenantID())) if err != nil { return err @@ -935,6 +951,9 @@ func doRestorePitr(ctx context.Context, ses *Session, stmt *tree.RestorePitr) (s // get pitr name pitrName := string(stmt.Name) + if isReservedPitrName(pitrName) { + return stats, moerr.NewInternalError(ctx, "pitr name is reserved") + } accountName := string(stmt.AccountName) dbName := string(stmt.DatabaseName) tblName := string(stmt.TableName) @@ -1910,7 +1929,7 @@ func getPitrByName(ctx context.Context, bh BackgroundExec, pitrName string, acco return nil, err } - sql := fmt.Sprintf("%s where pitr_name = '%s' and create_account = %d", getPitrFormat, pitrName, accountId) + sql := fmt.Sprintf("%s where pitr_name = '%s' and create_account = %d and kind = 'user'", getPitrFormat, pitrName, accountId) if records, err := getPitrRecords(newCtx, bh, sql); err != nil { return nil, err } else if len(records) != 1 { diff --git a/pkg/frontend/pitr_test.go b/pkg/frontend/pitr_test.go index b2b48c9ed636b..bd0296edc9df4 100644 --- a/pkg/frontend/pitr_test.go +++ b/pkg/frontend/pitr_test.go @@ -289,7 +289,7 @@ func Test_doRestorePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -369,7 +369,7 @@ func Test_doRestorePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -451,7 +451,7 @@ func Test_doRestorePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -534,7 +534,7 @@ func Test_doRestorePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -618,7 +618,7 @@ func Test_doRestorePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -716,7 +716,7 @@ func Test_doRestorePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -813,7 +813,7 @@ func Test_doRestorePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -911,7 +911,7 @@ func Test_doRestorePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -1007,7 +1007,7 @@ func Test_doRestorePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -1088,7 +1088,7 @@ func Test_doRestorePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -1172,7 +1172,7 @@ func Test_doRestorePitrValid(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -1253,7 +1253,7 @@ func Test_doRestorePitrValid(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -1296,7 +1296,7 @@ func TestGetSqlForCheckPitrDup(t *testing.T) { stmt: &tree.CreatePitr{ Level: tree.PITRLEVELCLUSTER, }, - expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 0 and obj_id = 18446744073709551615;", + expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 0 and obj_id = 18446744073709551615 and kind = 'user';", }, { createAccount: "sys", @@ -1304,7 +1304,7 @@ func TestGetSqlForCheckPitrDup(t *testing.T) { stmt: &tree.CreatePitr{ Level: tree.PITRLEVELACCOUNT, }, - expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 0 and account_name = 'sys' and level = 'account' and pitr_status = 1;", + expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 0 and account_name = 'sys' and level = 'account' and pitr_status = 1 and kind = 'user';", }, { createAccount: "testAccount", @@ -1312,7 +1312,7 @@ func TestGetSqlForCheckPitrDup(t *testing.T) { stmt: &tree.CreatePitr{ Level: tree.PITRLEVELACCOUNT, }, - expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 1 and account_name = 'testAccount' and level = 'account' and pitr_status = 1;", + expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 1 and account_name = 'testAccount' and level = 'account' and pitr_status = 1 and kind = 'user';", }, { createAccount: "sys", @@ -1321,7 +1321,7 @@ func TestGetSqlForCheckPitrDup(t *testing.T) { Level: tree.PITRLEVELACCOUNT, AccountName: "testAccountName", }, - expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 0 and account_name = 'testAccountName' and level = 'account' and pitr_status = 1;", + expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 0 and account_name = 'testAccountName' and level = 'account' and pitr_status = 1 and kind = 'user';", }, { createAccount: "sys", @@ -1330,7 +1330,7 @@ func TestGetSqlForCheckPitrDup(t *testing.T) { Level: tree.PITRLEVELDATABASE, DatabaseName: "testDb", }, - expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 0 and database_name = 'testDb' and level = 'database' and pitr_status = 1;", + expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 0 and database_name = 'testDb' and level = 'database' and pitr_status = 1 and kind = 'user';", }, { createAccount: "testAccount", @@ -1339,7 +1339,7 @@ func TestGetSqlForCheckPitrDup(t *testing.T) { Level: tree.PITRLEVELDATABASE, DatabaseName: "testDb", }, - expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 1 and database_name = 'testDb' and level = 'database' and pitr_status = 1;", + expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 1 and database_name = 'testDb' and level = 'database' and pitr_status = 1 and kind = 'user';", }, { createAccount: "sys", @@ -1349,7 +1349,7 @@ func TestGetSqlForCheckPitrDup(t *testing.T) { DatabaseName: "testDb", TableName: "testTable", }, - expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 0 and database_name = 'testDb' and table_name = 'testTable' and level = 'table' and pitr_status = 1;", + expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 0 and database_name = 'testDb' and table_name = 'testTable' and level = 'table' and pitr_status = 1 and kind = 'user';", }, { createAccount: "testAccount", @@ -1359,7 +1359,7 @@ func TestGetSqlForCheckPitrDup(t *testing.T) { DatabaseName: "testDb", TableName: "testTable", }, - expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 1 and database_name = 'testDb' and table_name = 'testTable' and level = 'table' and pitr_status = 1;", + expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 1 and database_name = 'testDb' and table_name = 'testTable' and level = 'table' and pitr_status = 1 and kind = 'user';", }, } @@ -1427,7 +1427,7 @@ func Test_doRestorePitr_Account(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -1553,7 +1553,7 @@ func Test_doRestorePitr_Account_Sys_Restore_Normal(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -1672,7 +1672,7 @@ func Test_doRestorePitr_Account_Sys_Restore_Normal_To_new(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -1793,7 +1793,7 @@ func Test_doRestorePitr_Account_Sys_Restore_Normal_To_new(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -1913,7 +1913,7 @@ func Test_doRestorePitr_Account_Sys_Restore_Normal_Using_cluster(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -2035,7 +2035,7 @@ func Test_doRestorePitr_Account_Sys_Restore_Normal_Using_cluster(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -2155,7 +2155,7 @@ func Test_doRestorePitr_Account_Sys_Restore_Normal_To_new_Using_cluster(t *testi mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -2281,7 +2281,7 @@ func Test_doRestorePitr_Account_Sys_Restore_Normal_To_new_Using_cluster(t *testi mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -2403,7 +2403,7 @@ func Test_doRestorePitr_Account_Sys_Restore_Normal_To_new_Using_cluster(t *testi mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -2543,7 +2543,7 @@ func Test_doCreatePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{}) bh.sql2result[sql] = mrs - sql = fmt.Sprintf("select pitr_id from mo_catalog.mo_pitr where create_account = %d", sysAccountID) + fmt.Sprintf(" and account_name = '%s' and level = 'account' and pitr_status = 1;", sysAccountName) + sql = fmt.Sprintf("select pitr_id from mo_catalog.mo_pitr where create_account = %d", sysAccountID) + fmt.Sprintf(" and account_name = '%s' and level = 'account' and pitr_status = 1 and kind = 'user';", sysAccountName) mrs = newMrsForPitrRecord([][]interface{}{}) bh.sql2result[sql] = mrs @@ -3241,7 +3241,7 @@ func Test_RestoreOtherAccount(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -3330,7 +3330,7 @@ func Test_RestoreOtherAccount(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -3420,7 +3420,7 @@ func Test_RestoreOtherAccount(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -3510,7 +3510,7 @@ func Test_RestoreOtherAccount(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -3575,3 +3575,12 @@ func Test_getPitrLengthAndUnit(t *testing.T) { _, _, _, err = getPitrLengthAndUnit(ctx, bh, "table", "", "", "tbl") assert.Error(t, err) } + +func Test_isReservedPitrName(t *testing.T) { + assert.True(t, isReservedPitrName(SYSMOCATALOGPITR)) + assert.True(t, isReservedPitrName("__mo_data_branch_pitr")) + assert.True(t, isReservedPitrName("__mo_data_branch_pitr_table_123")) + assert.True(t, isReservedPitrName("__mo_data_branch_pitr_database_456")) + assert.False(t, isReservedPitrName("__mo_data_branch_pitrx_table_123")) + assert.False(t, isReservedPitrName("user_pitr")) +} diff --git a/pkg/frontend/show_recovery_window.go b/pkg/frontend/show_recovery_window.go index 1f7544574c587..cd8b59d4725a4 100644 --- a/pkg/frontend/show_recovery_window.go +++ b/pkg/frontend/show_recovery_window.go @@ -198,7 +198,7 @@ func constructRecoveryWindow( " modified_time, pitr_length, pitr_unit, pitr_status, pitr_status_changed_time "+ "from "+ "`%s`.`%s` "+ - " where %s AND pitr_name != '%s'", + " where %s AND pitr_name != '%s' AND kind = 'user'", catalog.MO_CATALOG, catalog.MO_PITR, snapPitrSearchCond, SYSMOCATALOGPITR, ) diff --git a/pkg/sql/compile/ddl.go b/pkg/sql/compile/ddl.go index 8a15f463569b3..e7fb477562281 100644 --- a/pkg/sql/compile/ddl.go +++ b/pkg/sql/compile/ddl.go @@ -186,6 +186,10 @@ func (s *Scope) DropDatabase(c *Compile) error { if err != nil { return err } + databaseID, err := strconv.ParseUint(database.GetDatabaseId(c.proc.Ctx), 10, 64) + if err != nil { + return err + } relations, err := database.Relations(c.proc.Ctx) if err != nil { return err @@ -271,7 +275,7 @@ func (s *Scope) DropDatabase(c *Compile) error { // 4.update mo_pitr table if !needSkipDbs[dbName] { now := c.proc.GetTxnOperator().SnapshotTS().ToStdTime().UTC().UnixNano() - updatePitrSql := fmt.Sprintf("UPDATE `%s`.`%s` SET `%s` = %d, `%s` = %d WHERE `%s` = %d AND `%s` = '%s' AND `%s` = %d AND `%s` = %s", + updatePitrSql := fmt.Sprintf("UPDATE `%s`.`%s` SET `%s` = %d, `%s` = %d WHERE `%s` = %d AND `%s` = '%s' AND `%s` = %d AND `%s` = %s AND `kind` = 'user'", catalog.MO_CATALOG, catalog.MO_PITR, catalog.MO_PITR_STATUS, 0, catalog.MO_PITR_CHANGED_TIME, now, @@ -288,6 +292,13 @@ func (s *Scope) DropDatabase(c *Compile) error { } } + if err = c.cleanupDataBranchPitrsForDroppedDatabase( + uint64(accountId), + databaseID, + ); err != nil { + return err + } + // 5.unregister iscp jobs err = iscp.UnregisterJobsByDBName(c.proc.Ctx, c.proc.GetService(), c.proc.GetTxnOperator(), dbName) if err != nil { @@ -1851,6 +1862,206 @@ func (c *Compile) runSqlWithSystemTenant(sql string) error { ) } +const ( + dataBranchInternalPitrNamePrefix = "__mo_data_branch_pitr" + dataBranchInternalTablePitrPrefix = dataBranchInternalPitrNamePrefix + "_table_" + dataBranchInternalDatabasePitrPrefix = dataBranchInternalPitrNamePrefix + "_database_" + dataBranchInternalPitrKind = "internal" + dataBranchSysMoCatalogPitrName = "sys_mo_catalog_pitr" +) + +func isReservedPitrName(pitrName string) bool { + return pitrName == dataBranchSysMoCatalogPitrName || + pitrName == dataBranchInternalPitrNamePrefix || + strings.HasPrefix(pitrName, dataBranchInternalPitrNamePrefix+"_") +} + +type dataBranchDatabasePitrForCleanup struct { + accountID uint64 + databaseID uint64 + database string + createTime int64 +} + +func (c *Compile) cleanupDataBranchPitrsForDroppedTable(accountID uint64, databaseID uint64, tableID uint64) error { + if err := c.cleanupDataBranchTablePitrAfterDDL(accountID, tableID); err != nil { + return err + } + if err := c.cleanupDataBranchDatabasePitrByIDAfterDDL(accountID, databaseID); err != nil { + return err + } + return c.cleanupDataBranchMoCatalogPitrAfterDDL() +} + +func (c *Compile) cleanupDataBranchPitrsForDroppedDatabase(accountID uint64, databaseID uint64) error { + if err := c.cleanupDataBranchDatabasePitrByIDAfterDDL(accountID, databaseID); err != nil { + return err + } + return c.cleanupDataBranchMoCatalogPitrAfterDDL() +} + +func (c *Compile) cleanupDataBranchTablePitrAfterDDL(accountID uint64, tableID uint64) error { + pitrName := dataBranchInternalTablePitrPrefix + strconv.FormatUint(tableID, 10) + sql := fmt.Sprintf( + "delete from mo_catalog.mo_pitr where create_account = %d and pitr_name = %s and not exists (select 1 from mo_catalog.mo_branch_metadata where table_deleted = false and (table_id = %d or p_table_id = %d))", + accountID, + quoteCompileSQLStringLiteral(pitrName), + tableID, + tableID, + ) + return c.runSqlWithSystemTenant(sql) +} + +func (c *Compile) cleanupDataBranchDatabasePitrByIDAfterDDL(accountID uint64, databaseID uint64) error { + pitr, found, err := c.getDataBranchDatabasePitrForCleanup(accountID, databaseID) + if err != nil || !found { + return err + } + tableIDs, err := c.dataBranchDatabaseTableIDsAt(pitr.accountID, pitr.database, pitr.createTime) + if err != nil { + return err + } + return c.cleanupDataBranchDatabasePitrAfterDDL(pitr.accountID, pitr.databaseID, tableIDs) +} + +func (c *Compile) getDataBranchDatabasePitrForCleanup(accountID uint64, databaseID uint64) (dataBranchDatabasePitrForCleanup, bool, error) { + if databaseID == 0 { + return dataBranchDatabasePitrForCleanup{}, false, nil + } + pitrName := dataBranchInternalDatabasePitrPrefix + strconv.FormatUint(databaseID, 10) + sql := fmt.Sprintf( + "select create_account, obj_id, database_name, create_time from mo_catalog.mo_pitr where create_account = %d and pitr_name = %s and level = '%s' limit 1", + accountID, + quoteCompileSQLStringLiteral(pitrName), + tree.PITRLEVELDATABASE.String(), + ) + rs, err := c.runSqlWithResultAndOptions(sql, int32(catalog.System_Account), executor.StatementOption{}.WithDisableLog()) + if err != nil { + return dataBranchDatabasePitrForCleanup{}, false, err + } + defer rs.Close() + + var ( + pitr dataBranchDatabasePitrForCleanup + found bool + ) + rs.ReadRows(func(rows int, cols []*vector.Vector) bool { + if rows == 0 { + return false + } + pitr = dataBranchDatabasePitrForCleanup{ + accountID: vector.GetFixedAtWithTypeCheck[uint64](cols[0], 0), + databaseID: vector.GetFixedAtWithTypeCheck[uint64](cols[1], 0), + database: cols[2].GetStringAt(0), + createTime: vector.GetFixedAtWithTypeCheck[int64](cols[3], 0), + } + found = true + return false + }) + return pitr, found, nil +} + +func (c *Compile) cleanupDataBranchDatabasePitrAfterDDL(accountID uint64, databaseID uint64, tableIDs []uint64) error { + if len(tableIDs) > 0 { + hasRef, err := c.dataBranchTablesHaveActiveReference(tableIDs) + if err != nil { + return err + } + if hasRef { + return nil + } + } + pitrName := dataBranchInternalDatabasePitrPrefix + strconv.FormatUint(databaseID, 10) + sql := fmt.Sprintf( + "delete from mo_catalog.mo_pitr where create_account = %d and pitr_name = %s", + accountID, + quoteCompileSQLStringLiteral(pitrName), + ) + return c.runSqlWithSystemTenant(sql) +} + +func (c *Compile) dataBranchDatabaseTableIDsAt(accountID uint64, databaseName string, ts int64) ([]uint64, error) { + sql := fmt.Sprintf( + "select rel_id from mo_catalog.mo_tables {MO_TS = %d} where account_id = %d and reldatabase = %s", + ts, + accountID, + quoteCompileSQLStringLiteral(databaseName), + ) + rs, err := c.runSqlWithResultAndOptions(sql, int32(catalog.System_Account), executor.StatementOption{}.WithDisableLog()) + if err != nil { + return nil, err + } + defer rs.Close() + + tableIDs := make([]uint64, 0) + rs.ReadRows(func(rows int, cols []*vector.Vector) bool { + if rows == 0 { + return true + } + tableIDs = append(tableIDs, executor.GetFixedRows[uint64](cols[0])...) + return true + }) + return tableIDs, nil +} + +func (c *Compile) dataBranchTablesHaveActiveReference(tableIDs []uint64) (bool, error) { + if len(tableIDs) == 0 { + return false, nil + } + const batchSize = 512 + for start := 0; start < len(tableIDs); start += batchSize { + end := start + batchSize + if end > len(tableIDs) { + end = len(tableIDs) + } + idList := dataBranchUintListSQL(tableIDs[start:end]) + sql := fmt.Sprintf( + "select table_id from mo_catalog.mo_branch_metadata where table_deleted = false and (table_id in (%s) or p_table_id in (%s)) limit 1", + idList, + idList, + ) + rs, err := c.runSqlWithResultAndOptions(sql, int32(catalog.System_Account), executor.StatementOption{}.WithDisableLog()) + if err != nil { + return false, err + } + hasRef := false + rs.ReadRows(func(rows int, cols []*vector.Vector) bool { + hasRef = rows > 0 + return false + }) + rs.Close() + if hasRef { + return true, nil + } + } + return false, nil +} + +func (c *Compile) cleanupDataBranchMoCatalogPitrAfterDDL() error { + sql := fmt.Sprintf( + "delete from mo_catalog.mo_pitr where pitr_name = '%s' and create_account = 0 and kind = '%s' and not exists (select 1 from mo_catalog.mo_pitr where pitr_name != '%s')", + dataBranchSysMoCatalogPitrName, + dataBranchInternalPitrKind, + dataBranchSysMoCatalogPitrName, + ) + return c.runSqlWithSystemTenant(sql) +} + +func dataBranchUintListSQL(ids []uint64) string { + var sqlBuilder strings.Builder + for i, id := range ids { + if i > 0 { + sqlBuilder.WriteByte(',') + } + sqlBuilder.WriteString(strconv.FormatUint(id, 10)) + } + return sqlBuilder.String() +} + +func quoteCompileSQLStringLiteral(s string) string { + return "'" + strings.ReplaceAll(s, "'", "''") + "'" +} + func (s *Scope) CreateView(c *Compile) error { if s.ScopeAnalyzer == nil { s.ScopeAnalyzer = NewScopeAnalyzer() @@ -2901,6 +3112,7 @@ func (s *Scope) dropTableSingle(c *Compile, qry *plan.DropTable) error { } return err } + tblID = rel.GetTableID(c.proc.Ctx) // Check if the table is a CCPR shared table if !isTemp && !isView && !isSource { @@ -3113,11 +3325,15 @@ func (s *Scope) dropTableSingle(c *Compile, qry *plan.DropTable) error { if err != nil { return err } + databaseID, err := strconv.ParseUint(dbSource.GetDatabaseId(c.proc.Ctx), 10, 64) + if err != nil { + return err + } if !needSkipDbs[dbName] { now := c.proc.GetTxnOperator().SnapshotTS().ToStdTime().UTC().UnixNano() updatePitrSql := fmt.Sprintf( - "update `%s`.`%s` set `%s` = %d, `%s` = %d where `%s` = %d and `%s` = '%s' and `%s` = '%s' and `%s` = %d and `%s` = %d", + "update `%s`.`%s` set `%s` = %d, `%s` = %d where `%s` = %d and `%s` = '%s' and `%s` = '%s' and `%s` = %d and `%s` = %d and `kind` = 'user'", catalog.MO_CATALOG, catalog.MO_PITR, catalog.MO_PITR_STATUS, 0, catalog.MO_PITR_CHANGED_TIME, now, @@ -3157,6 +3373,10 @@ func (s *Scope) dropTableSingle(c *Compile, qry *plan.DropTable) error { } } + if err = c.cleanupDataBranchPitrsForDroppedTable(uint64(accountID), databaseID, tblID); err != nil { + return err + } + ps := partitionservice.GetService(c.proc.GetService()) extr := rel.GetExtraInfo() if extr == nil || @@ -4182,6 +4402,9 @@ func (s *Scope) CreatePitr(c *Compile) error { createPitr := s.Plan.GetDdl().GetCreatePitr() pitrName := createPitr.GetName() pitrLevel := tree.PitrLevel(createPitr.GetLevel()) + if pitrName == "__mo_data_branch_pitr" || strings.HasPrefix(pitrName, "__mo_data_branch_pitr_") { + return moerr.NewInternalError(c.proc.Ctx, "pitr name is reserved") + } // Get current account info accountId, err := defines.GetAccountId(c.proc.Ctx) @@ -4374,7 +4597,13 @@ func (s *Scope) DropPitr(c *Compile) error { if pitrName == "" { return moerr.NewInternalErrorf(c.proc.Ctx, "pitr name is empty") } - const sysMoCatalogPitr = "sys_mo_catalog_pitr" + if isReservedPitrName(pitrName) { + // Reserved PITRs are maintained internally; IF EXISTS keeps cleanup SQL idempotent. + if dropPitr.GetIfExists() { + return nil + } + return moerr.NewInternalError(c.proc.Ctx, "pitr name is reserved") + } const sysAccountId = 0 // Get current account @@ -4384,7 +4613,7 @@ func (s *Scope) DropPitr(c *Compile) error { } // 1. Check if PITR exists - checkSql := fmt.Sprintf("SELECT pitr_id FROM mo_catalog.mo_pitr WHERE pitr_name = '%s' AND create_account = %d", pitrName, accountId) + checkSql := fmt.Sprintf("SELECT pitr_id FROM mo_catalog.mo_pitr WHERE pitr_name = '%s' AND create_account = %d AND kind = 'user'", pitrName, accountId) res, err := c.runSqlWithResultAndOptions(checkSql, int32(sysAccountId), executor.StatementOption{}.WithDisableLog()) if err != nil { return err @@ -4398,14 +4627,14 @@ func (s *Scope) DropPitr(c *Compile) error { } // 2. Delete PITR record - deleteSql := fmt.Sprintf("DELETE FROM mo_catalog.mo_pitr WHERE pitr_name = '%s' AND create_account = %d", pitrName, accountId) + deleteSql := fmt.Sprintf("DELETE FROM mo_catalog.mo_pitr WHERE pitr_name = '%s' AND create_account = %d AND kind = 'user'", pitrName, accountId) err = c.runSqlWithAccountIdAndOptions(deleteSql, int32(sysAccountId), executor.StatementOption{}.WithDisableLog()) if err != nil { return err } // 3. Check if there are other PITR records besides sys_mo_catalog_pitr - checkOtherSql := fmt.Sprintf("SELECT pitr_id FROM mo_catalog.mo_pitr WHERE pitr_name != '%s'", sysMoCatalogPitr) + checkOtherSql := fmt.Sprintf("SELECT pitr_id FROM mo_catalog.mo_pitr WHERE pitr_name != '%s'", dataBranchSysMoCatalogPitrName) otherRes, err := c.runSqlWithResultAndOptions(checkOtherSql, sysAccountId, executor.StatementOption{}.WithDisableLog()) if err != nil { return err @@ -4413,7 +4642,7 @@ func (s *Scope) DropPitr(c *Compile) error { defer otherRes.Close() if len(otherRes.Batches) == 0 || otherRes.Batches[0].RowCount() == 0 { // 4. No other PITR records, delete sys_mo_catalog_pitr - deleteSysSql := fmt.Sprintf("DELETE FROM mo_catalog.mo_pitr WHERE pitr_name = '%s' AND create_account = %d", sysMoCatalogPitr, sysAccountId) + deleteSysSql := fmt.Sprintf("DELETE FROM mo_catalog.mo_pitr WHERE pitr_name = '%s' AND create_account = %d", dataBranchSysMoCatalogPitrName, sysAccountId) err = c.runSqlWithAccountIdAndOptions(deleteSysSql, sysAccountId, executor.StatementOption{}.WithDisableLog()) if err != nil { return err @@ -4440,7 +4669,7 @@ func pitrDupError(c *Compile, createPitr *plan.CreatePitr) error { func getSqlForCheckPitrDup( createPitr *plan.CreatePitr, ) string { - sql := "SELECT pitr_id FROM mo_catalog.mo_pitr WHERE create_account = %d" + sql := "SELECT pitr_id FROM mo_catalog.mo_pitr WHERE create_account = %d AND kind = 'user'" switch tree.PitrLevel(createPitr.GetLevel()) { case tree.PITRLEVELCLUSTER: return getSqlForCheckDupPitrFormat(createPitr.CurrentAccountId, math.MaxUint64) @@ -4459,7 +4688,7 @@ func getSqlForCheckPitrDup( } func getSqlForCheckDupPitrFormat(accountId uint32, objId uint64) string { - return fmt.Sprintf(`SELECT pitr_id FROM mo_catalog.mo_pitr WHERE create_account = %d AND obj_id = %d;`, accountId, objId) + return fmt.Sprintf(`SELECT pitr_id FROM mo_catalog.mo_pitr WHERE create_account = %d AND obj_id = %d AND kind = 'user';`, accountId, objId) } func getPitrObjectId(createPitr *plan.CreatePitr) uint64 { @@ -4527,7 +4756,7 @@ func CheckSysMoCatalogPitrResult(ctx context.Context, vecs []*vector.Vector, new } func getSqlForCheckPitrExists(pitrName string, accountId uint32) string { - return fmt.Sprintf("SELECT pitr_id FROM mo_catalog.mo_pitr WHERE pitr_name = '%s' AND create_account = %d ORDER BY pitr_id", pitrName, accountId) + return fmt.Sprintf("SELECT pitr_id FROM mo_catalog.mo_pitr WHERE pitr_name = '%s' AND create_account = %d AND kind = 'user' ORDER BY pitr_id", pitrName, accountId) } func (s *Scope) CreateCDC(c *Compile) error { @@ -5284,7 +5513,7 @@ func (c *Compile) checkPitrGranularity( return err } - sqlCluster := fmt.Sprintf(`SELECT pitr_length,pitr_unit FROM %s.%s WHERE level='cluster' AND account_id = %d`, + sqlCluster := fmt.Sprintf(`SELECT pitr_length,pitr_unit FROM %s.%s WHERE level='cluster' AND account_id = %d AND pitr_status = 1 AND kind = 'user'`, catalog.MO_CATALOG, catalog.MO_PITR, accountId) if res, err := c.runSqlWithResultAndOptions(sqlCluster, int32(catalog.System_Account), executor.StatementOption{}.WithDisableLog()); err == nil { if len(res.Batches) > 0 && res.Batches[0].Vecs[0].Length() > 0 { @@ -5319,20 +5548,20 @@ func (c *Compile) checkPitrGranularity( } checkDBByName := func(nameLower string) (bool, error) { - qDB := fmt.Sprintf(`select pitr_length,pitr_unit from %s.%s where level='database' and lower(database_name) = '%s' and account_id = %d`, + qDB := fmt.Sprintf(`select pitr_length,pitr_unit from %s.%s where level='database' and lower(database_name) = '%s' and account_id = %d and pitr_status = 1 and kind = 'user'`, catalog.MO_CATALOG, catalog.MO_PITR, nameLower, accountId) if ok, err := checkQuery(qDB); err != nil { return false, err } else if ok { return true, nil } - qDB2 := fmt.Sprintf(`select pitr_length,pitr_unit from %s.%s where level='database' and lower(database_name) = '%s'`, + qDB2 := fmt.Sprintf(`select pitr_length,pitr_unit from %s.%s where level='database' and lower(database_name) = '%s' and pitr_status = 1 and kind = 'user'`, catalog.MO_CATALOG, catalog.MO_PITR, nameLower) return checkQuery(qDB2) } // 1) account level always checked first for any pattern - qAcc := fmt.Sprintf(`select pitr_length,pitr_unit from %s.%s where level='account' and account_id = %d`, + qAcc := fmt.Sprintf(`select pitr_length,pitr_unit from %s.%s where level='account' and account_id = %d and pitr_status = 1 and kind = 'user'`, catalog.MO_CATALOG, catalog.MO_PITR, accountId) if ok, err := checkQuery(qAcc); err != nil { return err @@ -5364,7 +5593,7 @@ func (c *Compile) checkPitrGranularity( // 3) table level only for table pattern if !isDB { - qTbl := fmt.Sprintf(`select pitr_length,pitr_unit from %s.%s where level='table' and lower(database_name)='%s' and lower(table_name)='%s' and account_id = %d`, + qTbl := fmt.Sprintf(`select pitr_length,pitr_unit from %s.%s where level='table' and lower(database_name)='%s' and lower(table_name)='%s' and account_id = %d and pitr_status = 1 and kind = 'user'`, catalog.MO_CATALOG, catalog.MO_PITR, dbNameLower, tblNameLower, accountId) if ok, err := checkQuery(qTbl); err != nil { return err diff --git a/pkg/sql/compile/ddl_test.go b/pkg/sql/compile/ddl_test.go index 9eecf7c2390f3..5d5a9aa4db7df 100644 --- a/pkg/sql/compile/ddl_test.go +++ b/pkg/sql/compile/ddl_test.go @@ -867,6 +867,38 @@ func TestPitrDupError(t *testing.T) { } } +func TestDropPitrReservedName(t *testing.T) { + newDropPitrScope := func(name string, ifExists bool) *Scope { + return &Scope{Plan: &plan2.Plan{Plan: &plan2.Plan_Ddl{Ddl: &plan2.DataDefinition{ + Definition: &plan2.DataDefinition_DropPitr{ + DropPitr: &plan2.DropPitr{ + Name: name, + IfExists: ifExists, + }, + }, + }}}} + } + + compile := &Compile{proc: testutil.NewProc(t)} + for _, name := range []string{ + dataBranchSysMoCatalogPitrName, + dataBranchInternalPitrNamePrefix, + dataBranchInternalPitrNamePrefix + "_table_123", + } { + name := name + t.Run(name+"/without_if_exists", func(t *testing.T) { + err := newDropPitrScope(name, false).DropPitr(compile) + assert.Error(t, err) + assert.Contains(t, err.Error(), "pitr name is reserved") + }) + + t.Run(name+"/with_if_exists", func(t *testing.T) { + err := newDropPitrScope(name, true).DropPitr(compile) + assert.NoError(t, err) + }) + } +} + func TestIsExperimentalEnabled(t *testing.T) { s := newScope(TableClone) diff --git a/pkg/sql/plan/build_ddl.go b/pkg/sql/plan/build_ddl.go index 45ced634ca55d..4e3ddeda21edc 100644 --- a/pkg/sql/plan/build_ddl.go +++ b/pkg/sql/plan/build_ddl.go @@ -4956,8 +4956,7 @@ func buildCreatePitr(stmt *tree.CreatePitr, ctx CompilerContext) (*Plan, error) return nil, moerr.NewInternalErrorf(ctx.GetContext(), "invalid pitr unit %s", pitrUnit) } - // check pitr exists or not - if string(stmt.Name) == SYSMOCATALOGPITR { + if isReservedPitrName(string(stmt.Name)) { return nil, moerr.NewInternalError(ctx.GetContext(), "pitr name is reserved") } @@ -5031,6 +5030,9 @@ func buildCreatePitr(stmt *tree.CreatePitr, ctx CompilerContext) (*Plan, error) func buildDropPitr(stmt *tree.DropPitr, ctx CompilerContext) (*Plan, error) { ddlType := plan.DataDefinition_DROP_PITR // Remove privilege check, no account ID validation + if isReservedPitrName(string(stmt.Name)) && !stmt.IfExists { + return nil, moerr.NewInternalError(ctx.GetContext(), "pitr name is reserved") + } // Build drop pitr plan dropPitr := &plan.DropPitr{ @@ -5050,6 +5052,12 @@ func buildDropPitr(stmt *tree.DropPitr, ctx CompilerContext) (*Plan, error) { }, nil } +func isReservedPitrName(pitrName string) bool { + return pitrName == SYSMOCATALOGPITR || + pitrName == DATA_BRANCH_PITR_NAME_PREFIX || + strings.HasPrefix(pitrName, DATA_BRANCH_PITR_NAME_PREFIX+"_") +} + func buildCreateCDC(stmt *tree.CreateCDC, ctx CompilerContext) (*Plan, error) { accountId, err := ctx.GetAccountId() if err != nil { diff --git a/pkg/sql/plan/build_ddl_test.go b/pkg/sql/plan/build_ddl_test.go index 69d7024f4c4a3..a647d5c40bf0f 100644 --- a/pkg/sql/plan/build_ddl_test.go +++ b/pkg/sql/plan/build_ddl_test.go @@ -657,6 +657,19 @@ func TestBuildCreatePitr(t *testing.T) { assert.Contains(t, err.Error(), "pitr name is reserved") }) + t.Run("reserved data branch pitr name", func(t *testing.T) { + ctx := &MockCompilerContext{} + ctx.GetAccountNameFunc = func() string { return "sys" } + ctx.GetAccountIdFunc = func() (uint32, error) { return 1, nil } + stmt := baseStmt() + for _, name := range []string{"__mo_data_branch_pitr", "__mo_data_branch_pitr_table_123"} { + stmt.Name = tree.Identifier(name) + _, err := buildCreatePitr(stmt, ctx) + assert.Error(t, err) + assert.Contains(t, err.Error(), "pitr name is reserved") + } + }) + t.Run("database level pitr, database not exist", func(t *testing.T) { ctx := &MockCompilerContext{} ctx.GetAccountNameFunc = func() string { return "sys" } @@ -715,6 +728,34 @@ func TestBuildCreatePitr(t *testing.T) { }) } +func TestBuildDropPitrReservedName(t *testing.T) { + ctx := &MockCompilerContext{} + for _, name := range []string{ + SYSMOCATALOGPITR, + DATA_BRANCH_PITR_NAME_PREFIX, + DATA_BRANCH_PITR_NAME_PREFIX + "_table_123", + } { + name := name + t.Run(name+"/without_if_exists", func(t *testing.T) { + stmt := &tree.DropPitr{Name: tree.Identifier(name)} + _, err := buildDropPitr(stmt, ctx) + assert.Error(t, err) + assert.Contains(t, err.Error(), "pitr name is reserved") + }) + + t.Run(name+"/with_if_exists", func(t *testing.T) { + stmt := &tree.DropPitr{Name: tree.Identifier(name), IfExists: true} + built, err := buildDropPitr(stmt, ctx) + assert.NoError(t, err) + require.NotNil(t, built) + dropPitr := built.GetDdl().GetDropPitr() + require.NotNil(t, dropPitr) + assert.True(t, dropPitr.GetIfExists()) + assert.Equal(t, name, dropPitr.GetName()) + }) + } +} + func TestConstructAddedPartitionDefsErrors(t *testing.T) { ctx := NewEmptyCompilerContext() ctx.SetContext(context.Background()) diff --git a/pkg/sql/plan/build_show.go b/pkg/sql/plan/build_show.go index 674e88db69619..317181133b4ae 100644 --- a/pkg/sql/plan/build_show.go +++ b/pkg/sql/plan/build_show.go @@ -37,6 +37,7 @@ const MO_CATALOG_DB_NAME = "mo_catalog" const MO_DEFUALT_HOSTNAME = "localhost" const INFORMATION_SCHEMA = "information_schema" const SYSMOCATALOGPITR = "sys_mo_catalog_pitr" +const DATA_BRANCH_PITR_NAME_PREFIX = "__mo_data_branch_pitr" func buildShowCreateDatabase(stmt *tree.ShowCreateDatabase, ctx CompilerContext) (*Plan, error) { @@ -1002,7 +1003,7 @@ func buildShowPitr(stmt *tree.ShowPitr, ctx CompilerContext) (*Plan, error) { " pitr_length as `PITR_LENGTH`, "+ " pitr_unit as `PITR_UNIT` FROM %s.mo_pitr "+ "where "+ - " create_account = %d and pitr_name != '%s' "+ + " create_account = %d and pitr_name != '%s' and kind = 'user' "+ "ORDER BY "+ " create_time DESC", MO_CATALOG_DB_NAME, curAccountId, SYSMOCATALOGPITR) diff --git a/pkg/vm/engine/disttae/snapshot_scan.go b/pkg/vm/engine/disttae/snapshot_scan.go index 2bea0fb72e27a..7d1d5ce470d36 100644 --- a/pkg/vm/engine/disttae/snapshot_scan.go +++ b/pkg/vm/engine/disttae/snapshot_scan.go @@ -138,7 +138,7 @@ func ScanSnapshotWithCurrentRanges( zap.Int("disk-block-cnt", diskBlockCnt), ) - tombstones, err := tbl.CollectTombstones(ctx, 0, engine.Policy_CollectCommittedTombstones) + tombstones, err := tbl.collectTombstonesAtSnapshot(ctx, 0, engine.Policy_CollectCommittedTombstones, snapshotTS) if err != nil { return err } diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index 51686db1ecefb..212a26f6eeed7 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -543,6 +543,20 @@ func (tbl *txnTable) CollectTombstones( ctx context.Context, txnOffset int, policy engine.TombstoneCollectPolicy, +) (engine.Tombstoner, error) { + return tbl.collectTombstonesAtSnapshot( + ctx, + txnOffset, + policy, + types.TimestampToTS(tbl.db.op.SnapshotTS()), + ) +} + +func (tbl *txnTable) collectTombstonesAtSnapshot( + ctx context.Context, + txnOffset int, + policy engine.TombstoneCollectPolicy, + snapshot types.TS, ) (engine.Tombstoner, error) { tombstone := readutil.NewEmptyTombstoneData() @@ -603,8 +617,7 @@ func (tbl *txnTable) CollectTombstones( return nil, err } { - ts := tbl.db.op.SnapshotTS() - iter := state.NewRowsIter(types.TimestampToTS(ts), nil, true) + iter := state.NewRowsIter(snapshot, nil, true) for iter.Next() { entry := iter.Entry() //bid, o := entry.RowID.Decode() @@ -615,7 +628,6 @@ func (tbl *txnTable) CollectTombstones( //tombstone.SortInMemory() //collect committed persisted tombstones from partition state. - snapshot := types.TimestampToTS(tbl.db.op.Txn().SnapshotTS) err = state.CollectTombstoneObjects(snapshot, func(stats *objectio.ObjectStats) { tombstone.AppendFiles(*stats)