From b98ee48104e91893510aa49d3d3827886dab3fc5 Mon Sep 17 00:00:00 2001 From: gouhongshen Date: Wed, 29 Apr 2026 23:25:12 +0800 Subject: [PATCH 1/4] fix: protect data branch history with internal pitr --- pkg/frontend/authenticate_test.go | 2 +- pkg/frontend/data_branch.go | 892 +++++++++++++++++++++++++ pkg/frontend/pitr.go | 39 +- pkg/frontend/pitr_test.go | 77 ++- pkg/frontend/show_recovery_window.go | 2 +- pkg/sql/compile/ddl.go | 282 +++++++- pkg/sql/plan/build_ddl.go | 9 +- pkg/sql/plan/build_ddl_test.go | 13 + pkg/sql/plan/build_show.go | 5 +- pkg/vm/engine/disttae/snapshot_scan.go | 2 +- pkg/vm/engine/disttae/txn_table.go | 18 +- 11 files changed, 1274 insertions(+), 67 deletions(-) diff --git a/pkg/frontend/authenticate_test.go b/pkg/frontend/authenticate_test.go index bab339be80a31..84bb33e7fd52f 100644 --- a/pkg/frontend/authenticate_test.go +++ b/pkg/frontend/authenticate_test.go @@ -11927,7 +11927,7 @@ func TestCheckTimeStampValid(t *testing.T) { func Test_getSqlForCheckDupPitrFormat(t *testing.T) { sql := getSqlForCheckDupPitrFormat(123, 456) - assert.Equal(t, "select pitr_id from mo_catalog.mo_pitr where create_account = 123 and obj_id = 456;", sql) + assert.Equal(t, "select pitr_id from mo_catalog.mo_pitr where create_account = 123 and obj_id = 456 and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%';", sql) } func Test_checkPitrDup(t *testing.T) { diff --git a/pkg/frontend/data_branch.go b/pkg/frontend/data_branch.go index f324c21d04791..c4d8ca893b4a3 100644 --- a/pkg/frontend/data_branch.go +++ b/pkg/frontend/data_branch.go @@ -26,6 +26,7 @@ import ( "sync/atomic" "time" + "github.com/google/uuid" "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/malloc" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -49,6 +50,31 @@ import ( "go.uber.org/zap" ) +const ( + dataBranchPitrNamePrefix = "__mo_data_branch_pitr" + dataBranchPitrLength = 1 + dataBranchPitrUnit = "y" + dataBranchPitrKind = "internal" +) + +type dataBranchPitrRecord struct { + pitrName string + level string + accountID uint64 + accountName string + databaseName string + tableName string + objectID uint64 +} + +type dataBranchExistingPitrRecord struct { + exists bool + kind string + active bool + pitrLength uint64 + pitrUnit string +} + func newBranchHashmapAllocator(limitRate float64) *branchHashmapAllocator { throttler := rscthrottler.NewMemThrottler( "DataBranchHashmap", @@ -352,6 +378,10 @@ func dataBranchCreateTable( return } + if err = createDataBranchTablePitr(execCtx.reqCtx, ses, bh, receipt); err != nil { + return + } + return nil } @@ -394,9 +424,823 @@ func dataBranchCreateDatabase( } } + if err = createDataBranchDatabasePitr(execCtx.reqCtx, ses, bh, &stmt.CloneDatabase, receipts); err != nil { + return + } + + return nil +} + +func dataBranchPitrName(level string, objectID string) string { + return fmt.Sprintf("%s_%s_%s", dataBranchPitrNamePrefix, level, objectID) +} + +func dataBranchCloneTxn(ctx context.Context, bh BackgroundExec) (TxnOperator, error) { + be, ok := bh.(*backExec) + if !ok || be.backSes == nil || be.backSes.GetTxnHandler() == nil { + return nil, moerr.NewInternalError(ctx, "data branch pitr requires background transaction") + } + return be.backSes.GetTxnHandler().GetTxn(), nil +} + +func createDataBranchTablePitr( + ctx context.Context, + ses *Session, + bh BackgroundExec, + receipt cloneReceipt, +) error { + if receipt.dstDb == "" || receipt.dstTbl == "" { + return moerr.NewInternalError(ctx, "data branch table pitr requires target table") + } + + cloneTxnOp, err := dataBranchCloneTxn(ctx, bh) + if err != nil { + return err + } + if err = createDataBranchTablePitrForTarget( + ctx, ses, bh, cloneTxnOp, receipt.toAccount, receipt.dstDb, receipt.dstTbl, "target", + ); err != nil { + return err + } + if receipt.srcDb == "" || receipt.srcTbl == "" { + return nil + } + return createDataBranchTablePitrForTarget( + ctx, ses, bh, cloneTxnOp, receipt.srcAccount, receipt.srcDb, receipt.srcTbl, "source", + ) +} + +func createDataBranchDatabasePitr( + ctx context.Context, + ses *Session, + bh BackgroundExec, + stmt *tree.CloneDatabase, + receipts []cloneReceipt, +) error { + if stmt == nil { + return moerr.NewInternalError(ctx, "data branch database pitr requires clone database statement") + } + + var ( + toAccountID uint32 + srcAccountID uint32 + err error + ) + if len(receipts) > 0 { + toAccountID = receipts[0].toAccount + srcAccountID = receipts[0].srcAccount + } else { + var snapshot *plan2.Snapshot + srcAccountID, toAccountID, snapshot, err = getOpAndToAccountId( + ctx, ses, bh, stmt.ToAccountOpt, stmt.AtTsExpr, + ) + if err != nil { + return err + } + if snapshot != nil && snapshot.Tenant != nil { + srcAccountID = snapshot.Tenant.TenantID + } + } + + dstDb := stmt.DstDatabase.String() + if dstDb == "" { + return moerr.NewInternalError(ctx, "data branch database pitr requires target database") + } + cloneTxnOp, err := dataBranchCloneTxn(ctx, bh) + if err != nil { + return err + } + if err = createDataBranchDatabasePitrForTarget( + ctx, ses, bh, cloneTxnOp, toAccountID, dstDb, "target", + ); err != nil { + return err + } + + srcDb := stmt.SrcDatabase.String() + if len(receipts) > 0 && receipts[0].srcDb != "" { + srcDb = receipts[0].srcDb + } + if srcDb == "" { + return nil + } + return createDataBranchDatabasePitrForTarget( + ctx, ses, bh, cloneTxnOp, srcAccountID, srcDb, "source", + ) +} + +func createDataBranchTablePitrForTarget( + ctx context.Context, + ses *Session, + bh BackgroundExec, + cloneTxnOp TxnOperator, + accountID uint32, + dbName string, + tableName string, + role string, +) error { + tableID, err := dataBranchTableID(ctx, ses, cloneTxnOp, accountID, dbName, tableName) + if err != nil { + return err + } + pitrName := dataBranchPitrName("table", strconv.FormatUint(tableID, 10)) + accountName, err := dataBranchPitrAccountName(ctx, ses, bh, accountID) + if err != nil { + return err + } + logutil.Info( + "DataBranch-CreatePITR-Table", + zap.String("role", role), + zap.String("pitr-name", pitrName), + zap.Uint64("table-id", tableID), + zap.Uint32("account-id", accountID), + zap.String("database", dbName), + zap.String("table", tableName), + ) + return createDataBranchPitrRecord(ctx, ses, bh, cloneTxnOp, dataBranchPitrRecord{ + pitrName: pitrName, + level: tree.PITRLEVELTABLE.String(), + accountID: uint64(accountID), + accountName: accountName, + databaseName: dbName, + tableName: tableName, + objectID: tableID, + }) +} + +func createDataBranchDatabasePitrForTarget( + ctx context.Context, + ses *Session, + bh BackgroundExec, + cloneTxnOp TxnOperator, + accountID uint32, + dbName string, + role string, +) error { + databaseID, err := dataBranchDatabaseID(ctx, ses, cloneTxnOp, accountID, dbName) + if err != nil { + return err + } + pitrName := dataBranchPitrName("database", strconv.FormatUint(databaseID, 10)) + accountName, err := dataBranchPitrAccountName(ctx, ses, bh, accountID) + if err != nil { + return err + } + logutil.Info( + "DataBranch-CreatePITR-Database", + zap.String("role", role), + zap.String("pitr-name", pitrName), + zap.Uint64("database-id", databaseID), + zap.Uint32("account-id", accountID), + zap.String("database", dbName), + ) + return createDataBranchPitrRecord(ctx, ses, bh, cloneTxnOp, dataBranchPitrRecord{ + pitrName: pitrName, + level: tree.PITRLEVELDATABASE.String(), + accountID: uint64(accountID), + accountName: accountName, + databaseName: dbName, + objectID: databaseID, + }) +} + +func dataBranchDatabaseID( + ctx context.Context, + ses *Session, + cloneTxnOp TxnOperator, + accountID uint32, + dbName string, +) (uint64, error) { + dbCtx := defines.AttachAccountId(ctx, accountID) + db, err := ses.proc.GetSessionInfo().StorageEngine.Database(dbCtx, dbName, cloneTxnOp) + if err != nil { + return 0, err + } + return strconv.ParseUint(db.GetDatabaseId(dbCtx), 10, 64) +} + +func dataBranchTableID( + ctx context.Context, + ses *Session, + cloneTxnOp TxnOperator, + accountID uint32, + dbName string, + tableName string, +) (uint64, error) { + dbCtx := defines.AttachAccountId(ctx, accountID) + db, err := ses.proc.GetSessionInfo().StorageEngine.Database(dbCtx, dbName, cloneTxnOp) + if err != nil { + return 0, err + } + rel, err := db.Relation(dbCtx, tableName, nil) + if err != nil { + return 0, err + } + return rel.GetTableDef(dbCtx).TblId, nil +} + +func dataBranchPitrAccountName( + ctx context.Context, + ses *Session, + bh BackgroundExec, + accountID uint32, +) (string, error) { + if accountID == sysAccountID { + return sysAccountName, nil + } + if tenantInfo := ses.GetTenantInfo(); tenantInfo != nil && accountID == tenantInfo.GetTenantID() { + return tenantInfo.GetTenant(), nil + } + + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + sql := fmt.Sprintf( + "select account_name from mo_catalog.mo_account where account_id = %d", + accountID, + ) + bh.ClearExecResultSet() + if err := bh.Exec(sysCtx, sql); err != nil { + return "", err + } + erArray, err := getResultSet(sysCtx, bh) + if err != nil { + return "", err + } + if !execResultArrayHasData(erArray) { + return "", moerr.NewInternalErrorf(ctx, "account %d does not exist", accountID) + } + return erArray[0].GetString(sysCtx, 0, 0) +} + +func createDataBranchPitrRecord( + ctx context.Context, + ses *Session, + bh BackgroundExec, + cloneTxnOp TxnOperator, + record dataBranchPitrRecord, +) error { + now := cloneTxnOp.SnapshotTS().ToStdTime().UTC().UnixNano() + existing, err := getDataBranchPitrRecord(ctx, bh, record.pitrName, record.accountID) + if err != nil { + return err + } + if existing.exists { + if err = ensureDataBranchPitrRecord(ctx, bh, now, record, existing); err != nil { + return err + } + return ensureDataBranchMoCatalogPitr( + ctx, ses, bh, cloneTxnOp, now, + ) + } + + pitrID, err := uuid.NewV7() + if err != nil { + return err + } + sql, err := getSqlForCreateDataBranchPitrRecord(ctx, pitrID.String(), now, record) + if err != nil { + return err + } + + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + if err = bh.Exec(sysCtx, sql); err != nil { + // Concurrent branch creation can win the primary key race. Re-read the + // record and normalize it if it is the expected internal PITR. + existing, readErr := getDataBranchPitrRecord(ctx, bh, record.pitrName, record.accountID) + if readErr == nil && existing.exists && existing.kind == dataBranchPitrKind { + if ensureErr := ensureDataBranchPitrRecord(ctx, bh, now, record, existing); ensureErr != nil { + return ensureErr + } + return ensureDataBranchMoCatalogPitr(ctx, ses, bh, cloneTxnOp, now) + } + return err + } + return ensureDataBranchMoCatalogPitr(ctx, ses, bh, cloneTxnOp, now) +} + +func getDataBranchPitrRecord( + ctx context.Context, + bh BackgroundExec, + pitrName string, + accountID uint64, +) (dataBranchExistingPitrRecord, error) { + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + sql := fmt.Sprintf( + "select kind, pitr_status, pitr_length, pitr_unit from mo_catalog.mo_pitr where pitr_name = %s and create_account = %d", + quoteSQLStringLiteral(pitrName), + accountID, + ) + bh.ClearExecResultSet() + if err := bh.Exec(sysCtx, sql); err != nil { + return dataBranchExistingPitrRecord{}, err + } + erArray, err := getResultSet(sysCtx, bh) + if err != nil { + return dataBranchExistingPitrRecord{}, err + } + if !execResultArrayHasData(erArray) { + return dataBranchExistingPitrRecord{}, nil + } + kind, err := erArray[0].GetString(sysCtx, 0, 0) + if err != nil { + return dataBranchExistingPitrRecord{}, err + } + status, err := erArray[0].GetUint64(sysCtx, 0, 1) + if err != nil { + return dataBranchExistingPitrRecord{}, err + } + pitrLength, err := erArray[0].GetUint64(sysCtx, 0, 2) + if err != nil { + return dataBranchExistingPitrRecord{}, err + } + pitrUnit, err := erArray[0].GetString(sysCtx, 0, 3) + if err != nil { + return dataBranchExistingPitrRecord{}, err + } + return dataBranchExistingPitrRecord{ + exists: true, + kind: kind, + active: status == 1, + pitrLength: pitrLength, + pitrUnit: pitrUnit, + }, nil +} + +func ensureDataBranchPitrRecord( + ctx context.Context, + bh BackgroundExec, + now int64, + record dataBranchPitrRecord, + existing dataBranchExistingPitrRecord, +) error { + setParts := []string{ + fmt.Sprintf("modified_time = %d", now), + fmt.Sprintf("level = %s", quoteSQLStringLiteral(record.level)), + fmt.Sprintf("account_id = %d", record.accountID), + fmt.Sprintf("account_name = %s", quoteSQLStringLiteral(record.accountName)), + fmt.Sprintf("database_name = %s", quoteSQLStringLiteral(record.databaseName)), + fmt.Sprintf("table_name = %s", quoteSQLStringLiteral(record.tableName)), + fmt.Sprintf("obj_id = %d", record.objectID), + "pitr_status = 1", + fmt.Sprintf("kind = %s", quoteSQLStringLiteral(dataBranchPitrKind)), + } + if !existing.active { + setParts = append(setParts, fmt.Sprintf("pitr_status_changed_time = %d", now)) + } + needDurationUpdate, err := dataBranchPitrDurationNeedsUpdate(existing.pitrLength, existing.pitrUnit) + if err != nil { + return err + } + if needDurationUpdate { + setParts = append(setParts, + fmt.Sprintf("pitr_length = %d", dataBranchPitrLength), + fmt.Sprintf("pitr_unit = %s", quoteSQLStringLiteral(dataBranchPitrUnit)), + ) + } + + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + sql := fmt.Sprintf( + "update mo_catalog.mo_pitr set %s where pitr_name = %s and create_account = %d", + strings.Join(setParts, ", "), + quoteSQLStringLiteral(record.pitrName), + record.accountID, + ) + return bh.Exec(sysCtx, sql) +} + +func dataBranchPitrDurationNeedsUpdate(oldLength uint64, oldUnit string) (bool, error) { + oldMinTs, err := addTimeSpan(time.Time{}, int(oldLength), oldUnit) + if err != nil { + return false, err + } + newMinTs, err := addTimeSpan(time.Time{}, dataBranchPitrLength, dataBranchPitrUnit) + if err != nil { + return false, err + } + return newMinTs.Before(oldMinTs), nil +} + +func getSqlForCreateDataBranchPitrRecord( + ctx context.Context, + pitrID string, + now int64, + record dataBranchPitrRecord, +) (string, error) { + if err := inputNameIsInvalid(ctx, record.pitrName); err != nil { + return "", err + } + return fmt.Sprintf(`insert into mo_catalog.mo_pitr( + pitr_id, + pitr_name, + create_account, + create_time, + modified_time, + level, + account_id, + account_name, + database_name, + table_name, + obj_id, + pitr_length, + pitr_unit, + pitr_status_changed_time, + kind) values (%s, %s, %d, %d, %d, %s, %d, %s, %s, %s, %d, %d, %s, %d, %s);`, + quoteSQLStringLiteral(pitrID), + quoteSQLStringLiteral(record.pitrName), + record.accountID, + now, + now, + quoteSQLStringLiteral(record.level), + record.accountID, + quoteSQLStringLiteral(record.accountName), + quoteSQLStringLiteral(record.databaseName), + quoteSQLStringLiteral(record.tableName), + record.objectID, + dataBranchPitrLength, + quoteSQLStringLiteral(dataBranchPitrUnit), + now, + quoteSQLStringLiteral(dataBranchPitrKind), + ), nil +} + +func ensureDataBranchMoCatalogPitr( + ctx context.Context, + ses *Session, + bh BackgroundExec, + cloneTxnOp TxnOperator, + now int64, +) error { + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + sql := fmt.Sprintf( + "select pitr_length, pitr_unit from mo_catalog.mo_pitr where pitr_name = %s", + quoteSQLStringLiteral(SYSMOCATALOGPITR), + ) + bh.ClearExecResultSet() + if err := bh.Exec(sysCtx, sql); err != nil { + return err + } + erArray, err := getResultSet(sysCtx, bh) + if err != nil { + return err + } + + if execResultArrayHasData(erArray) { + sysPitrLength, err := erArray[0].GetUint64(sysCtx, 0, 0) + if err != nil { + return err + } + sysPitrUnit, err := erArray[0].GetString(sysCtx, 0, 1) + if err != nil { + return err + } + oldMinTs, err := addTimeSpan(time.Time{}, int(sysPitrLength), sysPitrUnit) + if err != nil { + return err + } + newMinTs, err := addTimeSpan(time.Time{}, dataBranchPitrLength, dataBranchPitrUnit) + if err != nil { + return err + } + setParts := []string{ + fmt.Sprintf("modified_time = %d", now), + "pitr_status = 1", + fmt.Sprintf("kind = %s", quoteSQLStringLiteral(dataBranchPitrKind)), + } + if !newMinTs.Before(oldMinTs) { + sql = fmt.Sprintf( + "update mo_catalog.mo_pitr set %s where pitr_name = %s and create_account = %d", + strings.Join(setParts, ", "), + quoteSQLStringLiteral(SYSMOCATALOGPITR), + sysAccountID, + ) + return bh.Exec(sysCtx, sql) + } + setParts = append(setParts, + fmt.Sprintf("pitr_length = %d", dataBranchPitrLength), + fmt.Sprintf("pitr_unit = %s", quoteSQLStringLiteral(dataBranchPitrUnit)), + ) + sql = fmt.Sprintf( + "update mo_catalog.mo_pitr set %s where pitr_name = %s and create_account = %d", + strings.Join(setParts, ", "), + quoteSQLStringLiteral(SYSMOCATALOGPITR), + sysAccountID, + ) + return bh.Exec(sysCtx, sql) + } + + moCatalogDB, err := ses.proc.GetSessionInfo().StorageEngine.Database( + sysCtx, catalog.MO_CATALOG, cloneTxnOp, + ) + if err != nil { + return err + } + moCatalogID, err := strconv.ParseUint(moCatalogDB.GetDatabaseId(sysCtx), 10, 64) + if err != nil { + return err + } + pitrID, err := uuid.NewV7() + if err != nil { + return err + } + sql, err = getSqlForCreateDataBranchPitrRecord(ctx, pitrID.String(), now, dataBranchPitrRecord{ + pitrName: SYSMOCATALOGPITR, + level: tree.PITRLEVELDATABASE.String(), + accountID: sysAccountID, + accountName: sysAccountName, + databaseName: catalog.MO_CATALOG, + objectID: moCatalogID, + }) + if err != nil { + return err + } + return bh.Exec(sysCtx, sql) +} + +func deleteDataBranchInternalPitrRecord( + ctx context.Context, + bh BackgroundExec, + accountID uint64, + pitrName string, +) error { + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + sql := fmt.Sprintf( + "delete from mo_catalog.mo_pitr where pitr_name = %s and create_account = %d and (kind = %s or pitr_name like %s)", + quoteSQLStringLiteral(pitrName), + accountID, + quoteSQLStringLiteral(dataBranchPitrKind), + quoteSQLStringLiteral(dataBranchPitrName("", "")+"%"), + ) + if err := bh.Exec(sysCtx, sql); err != nil { + return err + } + return cleanupDataBranchMoCatalogPitrIfUnused(ctx, bh) +} + +func cleanupDataBranchMoCatalogPitrIfUnused(ctx context.Context, bh BackgroundExec) error { + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + sql := fmt.Sprintf( + "select pitr_id from mo_catalog.mo_pitr where pitr_name != %s limit 1", + quoteSQLStringLiteral(SYSMOCATALOGPITR), + ) + bh.ClearExecResultSet() + if err := bh.Exec(sysCtx, sql); err != nil { + return err + } + erArray, err := getResultSet(sysCtx, bh) + if err != nil { + return err + } + if execResultArrayHasData(erArray) { + return nil + } + sql = fmt.Sprintf( + "delete from mo_catalog.mo_pitr where pitr_name = %s and create_account = %d and kind = %s", + quoteSQLStringLiteral(SYSMOCATALOGPITR), + sysAccountID, + quoteSQLStringLiteral(dataBranchPitrKind), + ) + return bh.Exec(sysCtx, sql) +} + +func cleanupDataBranchTablePitrIfUnused( + ctx context.Context, + bh BackgroundExec, + accountID uint64, + tableID uint64, +) error { + hasRef, err := dataBranchTablesHaveActiveReference(ctx, bh, []uint64{tableID}) + if err != nil { + return err + } + if hasRef { + return nil + } + return deleteDataBranchInternalPitrRecord( + ctx, + bh, + accountID, + dataBranchPitrName("table", strconv.FormatUint(tableID, 10)), + ) +} + +func cleanupAllDataBranchTablePitrsIfUnused(ctx context.Context, bh BackgroundExec) error { + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + sql := fmt.Sprintf( + "select create_account, obj_id from mo_catalog.mo_pitr where (kind = %s or pitr_name like %s) and level = %s and pitr_name like %s", + quoteSQLStringLiteral(dataBranchPitrKind), + quoteSQLStringLiteral(dataBranchPitrName("", "")+"%"), + quoteSQLStringLiteral(tree.PITRLEVELTABLE.String()), + quoteSQLStringLiteral(dataBranchPitrName("table", "")+"%"), + ) + bh.ClearExecResultSet() + if err := bh.Exec(sysCtx, sql); err != nil { + return err + } + erArray, err := getResultSet(sysCtx, bh) + if err != nil { + return err + } + if !execResultArrayHasData(erArray) { + return nil + } + + type tablePitr struct { + accountID uint64 + tableID uint64 + } + pitrs := make([]tablePitr, 0, erArray[0].GetRowCount()) + for i := uint64(0); i < erArray[0].GetRowCount(); i++ { + accountID, err := erArray[0].GetUint64(sysCtx, i, 0) + if err != nil { + return err + } + tableID, err := erArray[0].GetUint64(sysCtx, i, 1) + if err != nil { + return err + } + pitrs = append(pitrs, tablePitr{accountID: accountID, tableID: tableID}) + } + + for _, pitr := range pitrs { + if err = cleanupDataBranchTablePitrIfUnused(ctx, bh, pitr.accountID, pitr.tableID); err != nil { + return err + } + } return nil } +func cleanupAllDataBranchDatabasePitrsIfUnused(ctx context.Context, bh BackgroundExec) error { + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + sql := fmt.Sprintf( + "select create_account, obj_id, database_name, create_time from mo_catalog.mo_pitr where (kind = %s or pitr_name like %s) and level = %s and pitr_name like %s", + quoteSQLStringLiteral(dataBranchPitrKind), + quoteSQLStringLiteral(dataBranchPitrName("", "")+"%"), + quoteSQLStringLiteral(tree.PITRLEVELDATABASE.String()), + quoteSQLStringLiteral(dataBranchPitrName("database", "")+"%"), + ) + bh.ClearExecResultSet() + if err := bh.Exec(sysCtx, sql); err != nil { + return err + } + erArray, err := getResultSet(sysCtx, bh) + if err != nil { + return err + } + if !execResultArrayHasData(erArray) { + return nil + } + + type dbPitr struct { + accountID uint64 + databaseID uint64 + database string + createTime int64 + } + pitrs := make([]dbPitr, 0, erArray[0].GetRowCount()) + for i := uint64(0); i < erArray[0].GetRowCount(); i++ { + accountID, err := erArray[0].GetUint64(sysCtx, i, 0) + if err != nil { + return err + } + databaseID, err := erArray[0].GetUint64(sysCtx, i, 1) + if err != nil { + return err + } + database, err := erArray[0].GetString(sysCtx, i, 2) + if err != nil { + return err + } + createTime, err := erArray[0].GetInt64(sysCtx, i, 3) + if err != nil { + return err + } + pitrs = append(pitrs, dbPitr{ + accountID: accountID, + databaseID: databaseID, + database: database, + createTime: createTime, + }) + } + + for _, pitr := range pitrs { + tableIDs, err := dataBranchDatabaseTableIDsAt(ctx, bh, pitr.accountID, pitr.database, pitr.createTime) + if err != nil { + return err + } + if err = cleanupDataBranchDatabasePitrIfUnused( + ctx, bh, pitr.accountID, pitr.databaseID, tableIDs, + ); err != nil { + return err + } + } + return nil +} + +func cleanupDataBranchDatabasePitrIfUnused( + ctx context.Context, + bh BackgroundExec, + accountID uint64, + databaseID uint64, + tableIDs []uint64, +) error { + if len(tableIDs) > 0 { + hasRef, err := dataBranchTablesHaveActiveReference(ctx, bh, tableIDs) + if err != nil { + return err + } + if hasRef { + return nil + } + } + return deleteDataBranchInternalPitrRecord( + ctx, + bh, + accountID, + dataBranchPitrName("database", strconv.FormatUint(databaseID, 10)), + ) +} + +func dataBranchDatabaseTableIDsAt( + ctx context.Context, + bh BackgroundExec, + accountID uint64, + databaseName string, + ts int64, +) ([]uint64, error) { + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + sql := fmt.Sprintf( + "select rel_id from mo_catalog.mo_tables {MO_TS = %d} where account_id = %d and reldatabase = %s", + ts, + accountID, + quoteSQLStringLiteral(databaseName), + ) + bh.ClearExecResultSet() + if err := bh.Exec(sysCtx, sql); err != nil { + return nil, err + } + erArray, err := getResultSet(sysCtx, bh) + if err != nil { + return nil, err + } + if !execResultArrayHasData(erArray) { + return nil, nil + } + tableIDs := make([]uint64, 0, erArray[0].GetRowCount()) + for i := uint64(0); i < erArray[0].GetRowCount(); i++ { + tableID, err := erArray[0].GetUint64(sysCtx, i, 0) + if err != nil { + return nil, err + } + tableIDs = append(tableIDs, tableID) + } + return tableIDs, nil +} + +func dataBranchTablesHaveActiveReference( + ctx context.Context, + bh BackgroundExec, + tableIDs []uint64, +) (bool, error) { + if len(tableIDs) == 0 { + return false, nil + } + sysCtx := defines.AttachAccountId(ctx, sysAccountID) + const batchSize = 512 + for start := 0; start < len(tableIDs); start += batchSize { + end := start + batchSize + if end > len(tableIDs) { + end = len(tableIDs) + } + idList := dataBranchUintListSQL(tableIDs[start:end]) + sql := fmt.Sprintf( + "select table_id from mo_catalog.mo_branch_metadata where table_deleted = false and (table_id in (%s) or p_table_id in (%s)) limit 1", + idList, + idList, + ) + bh.ClearExecResultSet() + if err := bh.Exec(sysCtx, sql); err != nil { + return false, err + } + erArray, err := getResultSet(sysCtx, bh) + if err != nil { + return false, err + } + if execResultArrayHasData(erArray) { + return true, nil + } + } + return false, nil +} + +func dataBranchUintListSQL(ids []uint64) string { + var sqlBuilder strings.Builder + for i, id := range ids { + if i > 0 { + sqlBuilder.WriteByte(',') + } + sqlBuilder.WriteString(strconv.FormatUint(id, 10)) + } + return sqlBuilder.String() +} + func markBranchTablesDeleted( ctx context.Context, ses *Session, @@ -518,6 +1362,16 @@ func dataBranchDeleteTable( return } + if err = cleanupDataBranchTablePitrIfUnused(execCtx.reqCtx, bh, uint64(accId), tblID); err != nil { + return + } + if err = cleanupAllDataBranchTablePitrsIfUnused(execCtx.reqCtx, bh); err != nil { + return + } + if err = cleanupAllDataBranchDatabasePitrsIfUnused(execCtx.reqCtx, bh); err != nil { + return + } + return nil } @@ -545,6 +1399,8 @@ func dataBranchDeleteDatabase( dbName = stmt.DatabaseName accId uint32 sqlRet executor.Result + dbID uint64 + dbFound bool tableIDs []uint64 ) @@ -552,6 +1408,25 @@ func dataBranchDeleteDatabase( return } + if sqlRet, err = runSql( + execCtx.reqCtx, ses, bh, fmt.Sprintf( + "select dat_id from %s.%s where account_id = %d and datname = '%s'", + catalog.MO_CATALOG, catalog.MO_DATABASE, accId, dbName, + ), nil, nil, + ); err != nil { + return + } + + sqlRet.ReadRows(func(rows int, cols []*vector.Vector) bool { + if rows == 0 { + return false + } + dbID = vector.GetFixedAtWithTypeCheck[uint64](cols[0], 0) + dbFound = true + return false + }) + sqlRet.Close() + if sqlRet, err = runSql( execCtx.reqCtx, ses, bh, fmt.Sprintf( "select rel_id from %s.%s where account_id = %d and reldatabase = '%s'", @@ -586,6 +1461,23 @@ func dataBranchDeleteDatabase( return } + for _, tableID := range tableIDs { + if err = cleanupDataBranchTablePitrIfUnused(execCtx.reqCtx, bh, uint64(accId), tableID); err != nil { + return + } + } + if err = cleanupAllDataBranchTablePitrsIfUnused(execCtx.reqCtx, bh); err != nil { + return + } + if dbFound { + if err = cleanupDataBranchDatabasePitrIfUnused(execCtx.reqCtx, bh, uint64(accId), dbID, tableIDs); err != nil { + return + } + } + if err = cleanupAllDataBranchDatabasePitrsIfUnused(execCtx.reqCtx, bh); err != nil { + return + } + return nil } diff --git a/pkg/frontend/pitr.go b/pkg/frontend/pitr.go index a48a71cdb10f7..bf3d849a24407 100644 --- a/pkg/frontend/pitr.go +++ b/pkg/frontend/pitr.go @@ -54,15 +54,15 @@ var ( pitr_unit, pitr_status_changed_time) values ('%s', '%s', %d, %d, %d, '%s', %d, '%s', '%s', '%s', %d, %d, '%s', %d);` - checkPitrFormat = `select pitr_id from mo_catalog.mo_pitr where pitr_name = "%s" and create_account = %d order by pitr_id;` + checkPitrFormat = `select pitr_id from mo_catalog.mo_pitr where pitr_name = "%s" and create_account = %d and kind = 'user' order by pitr_id;` - dropPitrFormat = `delete from mo_catalog.mo_pitr where pitr_name = '%s' and create_account = %d order by pitr_id;` + dropPitrFormat = `delete from mo_catalog.mo_pitr where pitr_name = '%s' and create_account = %d and kind = 'user' order by pitr_id;` - alterPitrFormat = `update mo_catalog.mo_pitr set modified_time = %d, pitr_length = %d, pitr_unit = '%s' where pitr_name = '%s' and create_account = %d;` + alterPitrFormat = `update mo_catalog.mo_pitr set modified_time = %d, pitr_length = %d, pitr_unit = '%s' where pitr_name = '%s' and create_account = %d and kind = 'user';` getPitrFormat = `select * from mo_catalog.mo_pitr` - checkDupPitrFormat = `select pitr_id from mo_catalog.mo_pitr where create_account = %d and obj_id = %d;` + checkDupPitrFormat = `select pitr_id from mo_catalog.mo_pitr where create_account = %d and obj_id = %d and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%%';` getSqlForCheckDatabaseFmt = `select dat_id from mo_catalog.mo_database {MO_TS = %d} where datname = '%s';` @@ -73,9 +73,9 @@ var ( getPubInfoWithPitrFormat = `select pub_name, database_name, database_id, table_list, account_list, created_time, update_time, owner, creator, comment from mo_catalog.mo_pubs {MO_TS = %d} where account_id = %d and database_name = '%s';` // update mo_pitr object id - updateMoPitrAccountObjectIdFmt = `update mo_catalog.mo_pitr set pitr_status = 0, pitr_status_changed_time = %d where account_name = '%s' and pitr_status = 1 and obj_id = %d;` + updateMoPitrAccountObjectIdFmt = `update mo_catalog.mo_pitr set pitr_status = 0, pitr_status_changed_time = %d where account_name = '%s' and pitr_status = 1 and obj_id = %d and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%%';` - getLengthAndUnitFmt = `select pitr_length, pitr_unit from mo_catalog.mo_pitr where account_id = %d and level = '%s'` + getLengthAndUnitFmt = `select pitr_length, pitr_unit from mo_catalog.mo_pitr where account_id = %d and level = '%s' and pitr_status = 1 and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%%'` ) type pitrRecord struct { @@ -98,6 +98,12 @@ const ( SYSMOCATALOGPITR = "sys_mo_catalog_pitr" ) +func isReservedPitrName(pitrName string) bool { + return pitrName == SYSMOCATALOGPITR || + pitrName == dataBranchPitrNamePrefix || + strings.HasPrefix(pitrName, dataBranchPitrNamePrefix+"_") +} + func getSqlForCreatePitr( ctx context.Context, pitrId, pitrName string, @@ -232,14 +238,14 @@ func getSqlForCheckPitrDup(createAccount string, createAccountId uint64, stmt *t return getSqlForCheckDupPitrFormat(createAccountId, math.MaxUint64) case tree.PITRLEVELACCOUNT: if len(stmt.AccountName) > 0 { - return fmt.Sprintf(sql, createAccountId) + fmt.Sprintf(" and account_name = '%s' and level = 'account' and pitr_status = 1;", stmt.AccountName) + return fmt.Sprintf(sql, createAccountId) + fmt.Sprintf(" and account_name = '%s' and level = 'account' and pitr_status = 1 and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%%';", stmt.AccountName) } else { - return fmt.Sprintf(sql, createAccountId) + fmt.Sprintf(" and account_name = '%s' and level = 'account' and pitr_status = 1;", createAccount) + return fmt.Sprintf(sql, createAccountId) + fmt.Sprintf(" and account_name = '%s' and level = 'account' and pitr_status = 1 and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%%';", createAccount) } case tree.PITRLEVELDATABASE: - return fmt.Sprintf(sql, createAccountId) + fmt.Sprintf(" and database_name = '%s' and level = 'database' and pitr_status = 1;", stmt.DatabaseName) + return fmt.Sprintf(sql, createAccountId) + fmt.Sprintf(" and database_name = '%s' and level = 'database' and pitr_status = 1 and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%%';", stmt.DatabaseName) case tree.PITRLEVELTABLE: - return fmt.Sprintf(sql, createAccountId) + fmt.Sprintf(" and database_name = '%s' and table_name = '%s' and level = 'table' and pitr_status = 1;", stmt.DatabaseName, stmt.TableName) + return fmt.Sprintf(sql, createAccountId) + fmt.Sprintf(" and database_name = '%s' and table_name = '%s' and level = 'table' and pitr_status = 1 and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%%';", stmt.DatabaseName, stmt.TableName) } return sql } @@ -338,7 +344,7 @@ func doCreatePitr(ctx context.Context, ses *Session, stmt *tree.CreatePitr) erro // 6.check pitr exists or not pitrName = string(stmt.Name) - if pitrName == SYSMOCATALOGPITR { + if isReservedPitrName(pitrName) { return moerr.NewInternalError(ctx, "pitr name is reserved") } @@ -785,6 +791,9 @@ func doDropPitr(ctx context.Context, ses *Session, stmt *tree.DropPitr) (err err // check pitr exists or not tenantInfo := ses.GetTenantInfo() + if isReservedPitrName(string(stmt.Name)) { + return moerr.NewInternalError(ctx, "pitr name is reserved") + } pitrExist, err = checkPitrExistOrNot(ctx, bh, string(stmt.Name), uint64(tenantInfo.GetTenantID())) if err != nil { return err @@ -875,6 +884,9 @@ func doAlterPitr(ctx context.Context, ses *Session, stmt *tree.AlterPitr) (err e // check pitr exists or not tenantInfo := ses.GetTenantInfo() + if isReservedPitrName(string(stmt.Name)) { + return moerr.NewInternalError(ctx, "pitr name is reserved") + } pitrExist, err = checkPitrExistOrNot(ctx, bh, string(stmt.Name), uint64(tenantInfo.GetTenantID())) if err != nil { return err @@ -935,6 +947,9 @@ func doRestorePitr(ctx context.Context, ses *Session, stmt *tree.RestorePitr) (s // get pitr name pitrName := string(stmt.Name) + if isReservedPitrName(pitrName) { + return stats, moerr.NewInternalError(ctx, "pitr name is reserved") + } accountName := string(stmt.AccountName) dbName := string(stmt.DatabaseName) tblName := string(stmt.TableName) @@ -1910,7 +1925,7 @@ func getPitrByName(ctx context.Context, bh BackgroundExec, pitrName string, acco return nil, err } - sql := fmt.Sprintf("%s where pitr_name = '%s' and create_account = %d", getPitrFormat, pitrName, accountId) + sql := fmt.Sprintf("%s where pitr_name = '%s' and create_account = %d and kind = 'user'", getPitrFormat, pitrName, accountId) if records, err := getPitrRecords(newCtx, bh, sql); err != nil { return nil, err } else if len(records) != 1 { diff --git a/pkg/frontend/pitr_test.go b/pkg/frontend/pitr_test.go index b2b48c9ed636b..2236fd9900f3e 100644 --- a/pkg/frontend/pitr_test.go +++ b/pkg/frontend/pitr_test.go @@ -289,7 +289,7 @@ func Test_doRestorePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -369,7 +369,7 @@ func Test_doRestorePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -451,7 +451,7 @@ func Test_doRestorePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -534,7 +534,7 @@ func Test_doRestorePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -618,7 +618,7 @@ func Test_doRestorePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -716,7 +716,7 @@ func Test_doRestorePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -813,7 +813,7 @@ func Test_doRestorePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -911,7 +911,7 @@ func Test_doRestorePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -1007,7 +1007,7 @@ func Test_doRestorePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -1088,7 +1088,7 @@ func Test_doRestorePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -1172,7 +1172,7 @@ func Test_doRestorePitrValid(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -1253,7 +1253,7 @@ func Test_doRestorePitrValid(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -1296,7 +1296,7 @@ func TestGetSqlForCheckPitrDup(t *testing.T) { stmt: &tree.CreatePitr{ Level: tree.PITRLEVELCLUSTER, }, - expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 0 and obj_id = 18446744073709551615;", + expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 0 and obj_id = 18446744073709551615 and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%';", }, { createAccount: "sys", @@ -1304,7 +1304,7 @@ func TestGetSqlForCheckPitrDup(t *testing.T) { stmt: &tree.CreatePitr{ Level: tree.PITRLEVELACCOUNT, }, - expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 0 and account_name = 'sys' and level = 'account' and pitr_status = 1;", + expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 0 and account_name = 'sys' and level = 'account' and pitr_status = 1 and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%';", }, { createAccount: "testAccount", @@ -1312,7 +1312,7 @@ func TestGetSqlForCheckPitrDup(t *testing.T) { stmt: &tree.CreatePitr{ Level: tree.PITRLEVELACCOUNT, }, - expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 1 and account_name = 'testAccount' and level = 'account' and pitr_status = 1;", + expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 1 and account_name = 'testAccount' and level = 'account' and pitr_status = 1 and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%';", }, { createAccount: "sys", @@ -1321,7 +1321,7 @@ func TestGetSqlForCheckPitrDup(t *testing.T) { Level: tree.PITRLEVELACCOUNT, AccountName: "testAccountName", }, - expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 0 and account_name = 'testAccountName' and level = 'account' and pitr_status = 1;", + expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 0 and account_name = 'testAccountName' and level = 'account' and pitr_status = 1 and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%';", }, { createAccount: "sys", @@ -1330,7 +1330,7 @@ func TestGetSqlForCheckPitrDup(t *testing.T) { Level: tree.PITRLEVELDATABASE, DatabaseName: "testDb", }, - expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 0 and database_name = 'testDb' and level = 'database' and pitr_status = 1;", + expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 0 and database_name = 'testDb' and level = 'database' and pitr_status = 1 and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%';", }, { createAccount: "testAccount", @@ -1339,7 +1339,7 @@ func TestGetSqlForCheckPitrDup(t *testing.T) { Level: tree.PITRLEVELDATABASE, DatabaseName: "testDb", }, - expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 1 and database_name = 'testDb' and level = 'database' and pitr_status = 1;", + expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 1 and database_name = 'testDb' and level = 'database' and pitr_status = 1 and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%';", }, { createAccount: "sys", @@ -1349,7 +1349,7 @@ func TestGetSqlForCheckPitrDup(t *testing.T) { DatabaseName: "testDb", TableName: "testTable", }, - expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 0 and database_name = 'testDb' and table_name = 'testTable' and level = 'table' and pitr_status = 1;", + expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 0 and database_name = 'testDb' and table_name = 'testTable' and level = 'table' and pitr_status = 1 and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%';", }, { createAccount: "testAccount", @@ -1359,7 +1359,7 @@ func TestGetSqlForCheckPitrDup(t *testing.T) { DatabaseName: "testDb", TableName: "testTable", }, - expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 1 and database_name = 'testDb' and table_name = 'testTable' and level = 'table' and pitr_status = 1;", + expected: "select pitr_id from mo_catalog.mo_pitr where create_account = 1 and database_name = 'testDb' and table_name = 'testTable' and level = 'table' and pitr_status = 1 and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%';", }, } @@ -1427,7 +1427,7 @@ func Test_doRestorePitr_Account(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -1553,7 +1553,7 @@ func Test_doRestorePitr_Account_Sys_Restore_Normal(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -1672,7 +1672,7 @@ func Test_doRestorePitr_Account_Sys_Restore_Normal_To_new(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -1793,7 +1793,7 @@ func Test_doRestorePitr_Account_Sys_Restore_Normal_To_new(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -1913,7 +1913,7 @@ func Test_doRestorePitr_Account_Sys_Restore_Normal_Using_cluster(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -2035,7 +2035,7 @@ func Test_doRestorePitr_Account_Sys_Restore_Normal_Using_cluster(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -2155,7 +2155,7 @@ func Test_doRestorePitr_Account_Sys_Restore_Normal_To_new_Using_cluster(t *testi mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -2281,7 +2281,7 @@ func Test_doRestorePitr_Account_Sys_Restore_Normal_To_new_Using_cluster(t *testi mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -2403,7 +2403,7 @@ func Test_doRestorePitr_Account_Sys_Restore_Normal_To_new_Using_cluster(t *testi mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -2543,7 +2543,7 @@ func Test_doCreatePitr(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{}) bh.sql2result[sql] = mrs - sql = fmt.Sprintf("select pitr_id from mo_catalog.mo_pitr where create_account = %d", sysAccountID) + fmt.Sprintf(" and account_name = '%s' and level = 'account' and pitr_status = 1;", sysAccountName) + sql = fmt.Sprintf("select pitr_id from mo_catalog.mo_pitr where create_account = %d", sysAccountID) + fmt.Sprintf(" and account_name = '%s' and level = 'account' and pitr_status = 1 and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%%';", sysAccountName) mrs = newMrsForPitrRecord([][]interface{}{}) bh.sql2result[sql] = mrs @@ -3241,7 +3241,7 @@ func Test_RestoreOtherAccount(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -3330,7 +3330,7 @@ func Test_RestoreOtherAccount(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -3420,7 +3420,7 @@ func Test_RestoreOtherAccount(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -3510,7 +3510,7 @@ func Test_RestoreOtherAccount(t *testing.T) { mrs := newMrsForPitrRecord([][]interface{}{{"018ee4cd-5991-7caa-b75d-f9290144bd9f"}}) bh.sql2result[sql] = mrs - sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0" + sql = "select * from mo_catalog.mo_pitr where pitr_name = 'pitr01' and create_account = 0 and kind = 'user'" mrs = newMrsForPitrRecord([][]interface{}{{ "018ee4cd-5991-7caa-b75d-f9290144bd9f", "pitr01", @@ -3575,3 +3575,12 @@ func Test_getPitrLengthAndUnit(t *testing.T) { _, _, _, err = getPitrLengthAndUnit(ctx, bh, "table", "", "", "tbl") assert.Error(t, err) } + +func Test_isReservedPitrName(t *testing.T) { + assert.True(t, isReservedPitrName(SYSMOCATALOGPITR)) + assert.True(t, isReservedPitrName("__mo_data_branch_pitr")) + assert.True(t, isReservedPitrName("__mo_data_branch_pitr_table_123")) + assert.True(t, isReservedPitrName("__mo_data_branch_pitr_database_456")) + assert.False(t, isReservedPitrName("__mo_data_branch_pitrx_table_123")) + assert.False(t, isReservedPitrName("user_pitr")) +} diff --git a/pkg/frontend/show_recovery_window.go b/pkg/frontend/show_recovery_window.go index 1f7544574c587..c3094ee9d7c3f 100644 --- a/pkg/frontend/show_recovery_window.go +++ b/pkg/frontend/show_recovery_window.go @@ -198,7 +198,7 @@ func constructRecoveryWindow( " modified_time, pitr_length, pitr_unit, pitr_status, pitr_status_changed_time "+ "from "+ "`%s`.`%s` "+ - " where %s AND pitr_name != '%s'", + " where %s AND pitr_name != '%s' AND kind = 'user' AND pitr_name not like '__mo_data_branch_pitr_%%'", catalog.MO_CATALOG, catalog.MO_PITR, snapPitrSearchCond, SYSMOCATALOGPITR, ) diff --git a/pkg/sql/compile/ddl.go b/pkg/sql/compile/ddl.go index 8a15f463569b3..2d16a7485866c 100644 --- a/pkg/sql/compile/ddl.go +++ b/pkg/sql/compile/ddl.go @@ -271,7 +271,7 @@ func (s *Scope) DropDatabase(c *Compile) error { // 4.update mo_pitr table if !needSkipDbs[dbName] { now := c.proc.GetTxnOperator().SnapshotTS().ToStdTime().UTC().UnixNano() - updatePitrSql := fmt.Sprintf("UPDATE `%s`.`%s` SET `%s` = %d, `%s` = %d WHERE `%s` = %d AND `%s` = '%s' AND `%s` = %d AND `%s` = %s", + updatePitrSql := fmt.Sprintf("UPDATE `%s`.`%s` SET `%s` = %d, `%s` = %d WHERE `%s` = %d AND `%s` = '%s' AND `%s` = %d AND `%s` = %s AND `kind` = 'user' AND `pitr_name` not like '__mo_data_branch_pitr_%%'", catalog.MO_CATALOG, catalog.MO_PITR, catalog.MO_PITR_STATUS, 0, catalog.MO_PITR_CHANGED_TIME, now, @@ -288,6 +288,10 @@ func (s *Scope) DropDatabase(c *Compile) error { } } + if err = c.cleanupDataBranchInternalPitrsAfterDDL(); err != nil { + return err + } + // 5.unregister iscp jobs err = iscp.UnregisterJobsByDBName(c.proc.Ctx, c.proc.GetService(), c.proc.GetTxnOperator(), dbName) if err != nil { @@ -1851,6 +1855,250 @@ func (c *Compile) runSqlWithSystemTenant(sql string) error { ) } +const ( + dataBranchInternalPitrNamePrefix = "__mo_data_branch_pitr" + dataBranchInternalTablePitrPrefix = dataBranchInternalPitrNamePrefix + "_table_" + dataBranchInternalDatabasePitrPrefix = dataBranchInternalPitrNamePrefix + "_database_" + dataBranchInternalPitrKind = "internal" + dataBranchSysMoCatalogPitrName = "sys_mo_catalog_pitr" +) + +type dataBranchTablePitrForCleanup struct { + accountID uint64 + tableID uint64 +} + +type dataBranchDatabasePitrForCleanup struct { + accountID uint64 + databaseID uint64 + database string + createTime int64 +} + +func (c *Compile) cleanupDataBranchInternalPitrsAfterDDL() error { + if err := c.cleanupDataBranchTablePitrsAfterDDL(); err != nil { + return err + } + if err := c.cleanupDataBranchDatabasePitrsAfterDDL(); err != nil { + return err + } + return c.cleanupDataBranchMoCatalogPitrAfterDDL() +} + +func (c *Compile) cleanupDataBranchTablePitrsAfterDDL() error { + pitrs, err := c.getDataBranchTablePitrsForCleanup() + if err != nil { + return err + } + for _, pitr := range pitrs { + if err = c.cleanupDataBranchTablePitrAfterDDL(pitr.accountID, pitr.tableID); err != nil { + return err + } + } + return nil +} + +func (c *Compile) getDataBranchTablePitrsForCleanup() ([]dataBranchTablePitrForCleanup, error) { + sql := fmt.Sprintf( + "select create_account, obj_id from mo_catalog.mo_pitr where (kind = '%s' or pitr_name like '%s_%%') and level = '%s' and pitr_name like '%s%%'", + dataBranchInternalPitrKind, + dataBranchInternalPitrNamePrefix, + tree.PITRLEVELTABLE.String(), + dataBranchInternalTablePitrPrefix, + ) + rs, err := c.runSqlWithResultAndOptions(sql, int32(catalog.System_Account), executor.StatementOption{}.WithDisableLog()) + if err != nil { + return nil, err + } + defer rs.Close() + + pitrs := make([]dataBranchTablePitrForCleanup, 0) + rs.ReadRows(func(rows int, cols []*vector.Vector) bool { + if rows == 0 { + return true + } + accountIDs := executor.GetFixedRows[uint64](cols[0]) + tableIDs := executor.GetFixedRows[uint64](cols[1]) + for i := range accountIDs { + pitrs = append(pitrs, dataBranchTablePitrForCleanup{ + accountID: accountIDs[i], + tableID: tableIDs[i], + }) + } + return true + }) + return pitrs, nil +} + +func (c *Compile) cleanupDataBranchTablePitrAfterDDL(accountID uint64, tableID uint64) error { + sql := fmt.Sprintf( + "delete from mo_catalog.mo_pitr where create_account = %d and pitr_name = '%s%d' and (kind = '%s' or pitr_name like '%s_%%') and not exists (select 1 from mo_catalog.mo_branch_metadata where table_deleted = false and (table_id = %d or p_table_id = %d))", + accountID, + dataBranchInternalTablePitrPrefix, + tableID, + dataBranchInternalPitrKind, + dataBranchInternalPitrNamePrefix, + tableID, + tableID, + ) + return c.runSqlWithSystemTenant(sql) +} + +func (c *Compile) cleanupDataBranchDatabasePitrsAfterDDL() error { + pitrs, err := c.getDataBranchDatabasePitrsForCleanup() + if err != nil { + return err + } + for _, pitr := range pitrs { + tableIDs, err := c.dataBranchDatabaseTableIDsAt(pitr.accountID, pitr.database, pitr.createTime) + if err != nil { + return err + } + if err = c.cleanupDataBranchDatabasePitrAfterDDL(pitr.accountID, pitr.databaseID, tableIDs); err != nil { + return err + } + } + return nil +} + +func (c *Compile) getDataBranchDatabasePitrsForCleanup() ([]dataBranchDatabasePitrForCleanup, error) { + sql := fmt.Sprintf( + "select create_account, obj_id, database_name, create_time from mo_catalog.mo_pitr where (kind = '%s' or pitr_name like '%s_%%') and level = '%s' and pitr_name like '%s%%'", + dataBranchInternalPitrKind, + dataBranchInternalPitrNamePrefix, + tree.PITRLEVELDATABASE.String(), + dataBranchInternalDatabasePitrPrefix, + ) + rs, err := c.runSqlWithResultAndOptions(sql, int32(catalog.System_Account), executor.StatementOption{}.WithDisableLog()) + if err != nil { + return nil, err + } + defer rs.Close() + + pitrs := make([]dataBranchDatabasePitrForCleanup, 0) + rs.ReadRows(func(rows int, cols []*vector.Vector) bool { + if rows == 0 { + return true + } + accountIDs := executor.GetFixedRows[uint64](cols[0]) + databaseIDs := executor.GetFixedRows[uint64](cols[1]) + createTimes := executor.GetFixedRows[int64](cols[3]) + for i := range accountIDs { + pitrs = append(pitrs, dataBranchDatabasePitrForCleanup{ + accountID: accountIDs[i], + databaseID: databaseIDs[i], + database: cols[2].GetStringAt(i), + createTime: createTimes[i], + }) + } + return true + }) + return pitrs, nil +} + +func (c *Compile) cleanupDataBranchDatabasePitrAfterDDL(accountID uint64, databaseID uint64, tableIDs []uint64) error { + if len(tableIDs) > 0 { + hasRef, err := c.dataBranchTablesHaveActiveReference(tableIDs) + if err != nil { + return err + } + if hasRef { + return nil + } + } + sql := fmt.Sprintf( + "delete from mo_catalog.mo_pitr where create_account = %d and pitr_name = '%s%d' and (kind = '%s' or pitr_name like '%s_%%')", + accountID, + dataBranchInternalDatabasePitrPrefix, + databaseID, + dataBranchInternalPitrKind, + dataBranchInternalPitrNamePrefix, + ) + return c.runSqlWithSystemTenant(sql) +} + +func (c *Compile) dataBranchDatabaseTableIDsAt(accountID uint64, databaseName string, ts int64) ([]uint64, error) { + sql := fmt.Sprintf( + "select rel_id from mo_catalog.mo_tables {MO_TS = %d} where account_id = %d and reldatabase = %s", + ts, + accountID, + quoteCompileSQLStringLiteral(databaseName), + ) + rs, err := c.runSqlWithResultAndOptions(sql, int32(catalog.System_Account), executor.StatementOption{}.WithDisableLog()) + if err != nil { + return nil, err + } + defer rs.Close() + + tableIDs := make([]uint64, 0) + rs.ReadRows(func(rows int, cols []*vector.Vector) bool { + if rows == 0 { + return true + } + tableIDs = append(tableIDs, executor.GetFixedRows[uint64](cols[0])...) + return true + }) + return tableIDs, nil +} + +func (c *Compile) dataBranchTablesHaveActiveReference(tableIDs []uint64) (bool, error) { + if len(tableIDs) == 0 { + return false, nil + } + const batchSize = 512 + for start := 0; start < len(tableIDs); start += batchSize { + end := start + batchSize + if end > len(tableIDs) { + end = len(tableIDs) + } + idList := dataBranchUintListSQL(tableIDs[start:end]) + sql := fmt.Sprintf( + "select table_id from mo_catalog.mo_branch_metadata where table_deleted = false and (table_id in (%s) or p_table_id in (%s)) limit 1", + idList, + idList, + ) + rs, err := c.runSqlWithResultAndOptions(sql, int32(catalog.System_Account), executor.StatementOption{}.WithDisableLog()) + if err != nil { + return false, err + } + hasRef := false + rs.ReadRows(func(rows int, cols []*vector.Vector) bool { + hasRef = rows > 0 + return false + }) + rs.Close() + if hasRef { + return true, nil + } + } + return false, nil +} + +func (c *Compile) cleanupDataBranchMoCatalogPitrAfterDDL() error { + sql := fmt.Sprintf( + "delete from mo_catalog.mo_pitr where pitr_name = '%s' and create_account = 0 and kind = '%s' and not exists (select 1 from mo_catalog.mo_pitr where pitr_name != '%s')", + dataBranchSysMoCatalogPitrName, + dataBranchInternalPitrKind, + dataBranchSysMoCatalogPitrName, + ) + return c.runSqlWithSystemTenant(sql) +} + +func dataBranchUintListSQL(ids []uint64) string { + var sqlBuilder strings.Builder + for i, id := range ids { + if i > 0 { + sqlBuilder.WriteByte(',') + } + sqlBuilder.WriteString(strconv.FormatUint(id, 10)) + } + return sqlBuilder.String() +} + +func quoteCompileSQLStringLiteral(s string) string { + return "'" + strings.ReplaceAll(s, "'", "''") + "'" +} + func (s *Scope) CreateView(c *Compile) error { if s.ScopeAnalyzer == nil { s.ScopeAnalyzer = NewScopeAnalyzer() @@ -3117,7 +3365,7 @@ func (s *Scope) dropTableSingle(c *Compile, qry *plan.DropTable) error { if !needSkipDbs[dbName] { now := c.proc.GetTxnOperator().SnapshotTS().ToStdTime().UTC().UnixNano() updatePitrSql := fmt.Sprintf( - "update `%s`.`%s` set `%s` = %d, `%s` = %d where `%s` = %d and `%s` = '%s' and `%s` = '%s' and `%s` = %d and `%s` = %d", + "update `%s`.`%s` set `%s` = %d, `%s` = %d where `%s` = %d and `%s` = '%s' and `%s` = '%s' and `%s` = %d and `%s` = %d and `kind` = 'user' and `pitr_name` not like '__mo_data_branch_pitr_%%'", catalog.MO_CATALOG, catalog.MO_PITR, catalog.MO_PITR_STATUS, 0, catalog.MO_PITR_CHANGED_TIME, now, @@ -3157,6 +3405,10 @@ func (s *Scope) dropTableSingle(c *Compile, qry *plan.DropTable) error { } } + if err = c.cleanupDataBranchInternalPitrsAfterDDL(); err != nil { + return err + } + ps := partitionservice.GetService(c.proc.GetService()) extr := rel.GetExtraInfo() if extr == nil || @@ -4182,6 +4434,9 @@ func (s *Scope) CreatePitr(c *Compile) error { createPitr := s.Plan.GetDdl().GetCreatePitr() pitrName := createPitr.GetName() pitrLevel := tree.PitrLevel(createPitr.GetLevel()) + if pitrName == "__mo_data_branch_pitr" || strings.HasPrefix(pitrName, "__mo_data_branch_pitr_") { + return moerr.NewInternalError(c.proc.Ctx, "pitr name is reserved") + } // Get current account info accountId, err := defines.GetAccountId(c.proc.Ctx) @@ -4374,6 +4629,9 @@ func (s *Scope) DropPitr(c *Compile) error { if pitrName == "" { return moerr.NewInternalErrorf(c.proc.Ctx, "pitr name is empty") } + if pitrName == "__mo_data_branch_pitr" || strings.HasPrefix(pitrName, "__mo_data_branch_pitr_") { + return moerr.NewInternalError(c.proc.Ctx, "pitr name is reserved") + } const sysMoCatalogPitr = "sys_mo_catalog_pitr" const sysAccountId = 0 @@ -4384,7 +4642,7 @@ func (s *Scope) DropPitr(c *Compile) error { } // 1. Check if PITR exists - checkSql := fmt.Sprintf("SELECT pitr_id FROM mo_catalog.mo_pitr WHERE pitr_name = '%s' AND create_account = %d", pitrName, accountId) + checkSql := fmt.Sprintf("SELECT pitr_id FROM mo_catalog.mo_pitr WHERE pitr_name = '%s' AND create_account = %d AND kind = 'user'", pitrName, accountId) res, err := c.runSqlWithResultAndOptions(checkSql, int32(sysAccountId), executor.StatementOption{}.WithDisableLog()) if err != nil { return err @@ -4398,7 +4656,7 @@ func (s *Scope) DropPitr(c *Compile) error { } // 2. Delete PITR record - deleteSql := fmt.Sprintf("DELETE FROM mo_catalog.mo_pitr WHERE pitr_name = '%s' AND create_account = %d", pitrName, accountId) + deleteSql := fmt.Sprintf("DELETE FROM mo_catalog.mo_pitr WHERE pitr_name = '%s' AND create_account = %d AND kind = 'user'", pitrName, accountId) err = c.runSqlWithAccountIdAndOptions(deleteSql, int32(sysAccountId), executor.StatementOption{}.WithDisableLog()) if err != nil { return err @@ -4440,7 +4698,7 @@ func pitrDupError(c *Compile, createPitr *plan.CreatePitr) error { func getSqlForCheckPitrDup( createPitr *plan.CreatePitr, ) string { - sql := "SELECT pitr_id FROM mo_catalog.mo_pitr WHERE create_account = %d" + sql := "SELECT pitr_id FROM mo_catalog.mo_pitr WHERE create_account = %d AND kind = 'user' AND pitr_name not like '__mo_data_branch_pitr_%%'" switch tree.PitrLevel(createPitr.GetLevel()) { case tree.PITRLEVELCLUSTER: return getSqlForCheckDupPitrFormat(createPitr.CurrentAccountId, math.MaxUint64) @@ -4459,7 +4717,7 @@ func getSqlForCheckPitrDup( } func getSqlForCheckDupPitrFormat(accountId uint32, objId uint64) string { - return fmt.Sprintf(`SELECT pitr_id FROM mo_catalog.mo_pitr WHERE create_account = %d AND obj_id = %d;`, accountId, objId) + return fmt.Sprintf(`SELECT pitr_id FROM mo_catalog.mo_pitr WHERE create_account = %d AND obj_id = %d AND kind = 'user' AND pitr_name not like '__mo_data_branch_pitr_%%';`, accountId, objId) } func getPitrObjectId(createPitr *plan.CreatePitr) uint64 { @@ -4527,7 +4785,7 @@ func CheckSysMoCatalogPitrResult(ctx context.Context, vecs []*vector.Vector, new } func getSqlForCheckPitrExists(pitrName string, accountId uint32) string { - return fmt.Sprintf("SELECT pitr_id FROM mo_catalog.mo_pitr WHERE pitr_name = '%s' AND create_account = %d ORDER BY pitr_id", pitrName, accountId) + return fmt.Sprintf("SELECT pitr_id FROM mo_catalog.mo_pitr WHERE pitr_name = '%s' AND create_account = %d AND kind = 'user' AND pitr_name not like '__mo_data_branch_pitr_%%' ORDER BY pitr_id", pitrName, accountId) } func (s *Scope) CreateCDC(c *Compile) error { @@ -5284,7 +5542,7 @@ func (c *Compile) checkPitrGranularity( return err } - sqlCluster := fmt.Sprintf(`SELECT pitr_length,pitr_unit FROM %s.%s WHERE level='cluster' AND account_id = %d`, + sqlCluster := fmt.Sprintf(`SELECT pitr_length,pitr_unit FROM %s.%s WHERE level='cluster' AND account_id = %d AND pitr_status = 1 AND kind = 'user' AND pitr_name not like '__mo_data_branch_pitr_%%'`, catalog.MO_CATALOG, catalog.MO_PITR, accountId) if res, err := c.runSqlWithResultAndOptions(sqlCluster, int32(catalog.System_Account), executor.StatementOption{}.WithDisableLog()); err == nil { if len(res.Batches) > 0 && res.Batches[0].Vecs[0].Length() > 0 { @@ -5319,20 +5577,20 @@ func (c *Compile) checkPitrGranularity( } checkDBByName := func(nameLower string) (bool, error) { - qDB := fmt.Sprintf(`select pitr_length,pitr_unit from %s.%s where level='database' and lower(database_name) = '%s' and account_id = %d`, + qDB := fmt.Sprintf(`select pitr_length,pitr_unit from %s.%s where level='database' and lower(database_name) = '%s' and account_id = %d and pitr_status = 1 and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%%'`, catalog.MO_CATALOG, catalog.MO_PITR, nameLower, accountId) if ok, err := checkQuery(qDB); err != nil { return false, err } else if ok { return true, nil } - qDB2 := fmt.Sprintf(`select pitr_length,pitr_unit from %s.%s where level='database' and lower(database_name) = '%s'`, + qDB2 := fmt.Sprintf(`select pitr_length,pitr_unit from %s.%s where level='database' and lower(database_name) = '%s' and pitr_status = 1 and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%%'`, catalog.MO_CATALOG, catalog.MO_PITR, nameLower) return checkQuery(qDB2) } // 1) account level always checked first for any pattern - qAcc := fmt.Sprintf(`select pitr_length,pitr_unit from %s.%s where level='account' and account_id = %d`, + qAcc := fmt.Sprintf(`select pitr_length,pitr_unit from %s.%s where level='account' and account_id = %d and pitr_status = 1 and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%%'`, catalog.MO_CATALOG, catalog.MO_PITR, accountId) if ok, err := checkQuery(qAcc); err != nil { return err @@ -5364,7 +5622,7 @@ func (c *Compile) checkPitrGranularity( // 3) table level only for table pattern if !isDB { - qTbl := fmt.Sprintf(`select pitr_length,pitr_unit from %s.%s where level='table' and lower(database_name)='%s' and lower(table_name)='%s' and account_id = %d`, + qTbl := fmt.Sprintf(`select pitr_length,pitr_unit from %s.%s where level='table' and lower(database_name)='%s' and lower(table_name)='%s' and account_id = %d and pitr_status = 1 and kind = 'user' and pitr_name not like '__mo_data_branch_pitr_%%'`, catalog.MO_CATALOG, catalog.MO_PITR, dbNameLower, tblNameLower, accountId) if ok, err := checkQuery(qTbl); err != nil { return err diff --git a/pkg/sql/plan/build_ddl.go b/pkg/sql/plan/build_ddl.go index 45ced634ca55d..0cb8a50e455b1 100644 --- a/pkg/sql/plan/build_ddl.go +++ b/pkg/sql/plan/build_ddl.go @@ -4957,7 +4957,9 @@ func buildCreatePitr(stmt *tree.CreatePitr, ctx CompilerContext) (*Plan, error) } // check pitr exists or not - if string(stmt.Name) == SYSMOCATALOGPITR { + if string(stmt.Name) == SYSMOCATALOGPITR || + string(stmt.Name) == DATA_BRANCH_PITR_NAME_PREFIX || + strings.HasPrefix(string(stmt.Name), DATA_BRANCH_PITR_NAME_PREFIX+"_") { return nil, moerr.NewInternalError(ctx.GetContext(), "pitr name is reserved") } @@ -5031,6 +5033,11 @@ func buildCreatePitr(stmt *tree.CreatePitr, ctx CompilerContext) (*Plan, error) func buildDropPitr(stmt *tree.DropPitr, ctx CompilerContext) (*Plan, error) { ddlType := plan.DataDefinition_DROP_PITR // Remove privilege check, no account ID validation + if string(stmt.Name) == SYSMOCATALOGPITR || + string(stmt.Name) == DATA_BRANCH_PITR_NAME_PREFIX || + strings.HasPrefix(string(stmt.Name), DATA_BRANCH_PITR_NAME_PREFIX+"_") { + return nil, moerr.NewInternalError(ctx.GetContext(), "pitr name is reserved") + } // Build drop pitr plan dropPitr := &plan.DropPitr{ diff --git a/pkg/sql/plan/build_ddl_test.go b/pkg/sql/plan/build_ddl_test.go index 69d7024f4c4a3..bb43896454c78 100644 --- a/pkg/sql/plan/build_ddl_test.go +++ b/pkg/sql/plan/build_ddl_test.go @@ -657,6 +657,19 @@ func TestBuildCreatePitr(t *testing.T) { assert.Contains(t, err.Error(), "pitr name is reserved") }) + t.Run("reserved data branch pitr name", func(t *testing.T) { + ctx := &MockCompilerContext{} + ctx.GetAccountNameFunc = func() string { return "sys" } + ctx.GetAccountIdFunc = func() (uint32, error) { return 1, nil } + stmt := baseStmt() + for _, name := range []string{"__mo_data_branch_pitr", "__mo_data_branch_pitr_table_123"} { + stmt.Name = tree.Identifier(name) + _, err := buildCreatePitr(stmt, ctx) + assert.Error(t, err) + assert.Contains(t, err.Error(), "pitr name is reserved") + } + }) + t.Run("database level pitr, database not exist", func(t *testing.T) { ctx := &MockCompilerContext{} ctx.GetAccountNameFunc = func() string { return "sys" } diff --git a/pkg/sql/plan/build_show.go b/pkg/sql/plan/build_show.go index 674e88db69619..f9ac20850f65f 100644 --- a/pkg/sql/plan/build_show.go +++ b/pkg/sql/plan/build_show.go @@ -37,6 +37,7 @@ const MO_CATALOG_DB_NAME = "mo_catalog" const MO_DEFUALT_HOSTNAME = "localhost" const INFORMATION_SCHEMA = "information_schema" const SYSMOCATALOGPITR = "sys_mo_catalog_pitr" +const DATA_BRANCH_PITR_NAME_PREFIX = "__mo_data_branch_pitr" func buildShowCreateDatabase(stmt *tree.ShowCreateDatabase, ctx CompilerContext) (*Plan, error) { @@ -1002,10 +1003,10 @@ func buildShowPitr(stmt *tree.ShowPitr, ctx CompilerContext) (*Plan, error) { " pitr_length as `PITR_LENGTH`, "+ " pitr_unit as `PITR_UNIT` FROM %s.mo_pitr "+ "where "+ - " create_account = %d and pitr_name != '%s' "+ + " create_account = %d and pitr_name != '%s' and kind = 'user' and pitr_name not like '%s%%' "+ "ORDER BY "+ " create_time DESC", - MO_CATALOG_DB_NAME, curAccountId, SYSMOCATALOGPITR) + MO_CATALOG_DB_NAME, curAccountId, SYSMOCATALOGPITR, DATA_BRANCH_PITR_NAME_PREFIX+"_") newCtx := ctx.GetContext() if curAccountId != catalog.System_Account { diff --git a/pkg/vm/engine/disttae/snapshot_scan.go b/pkg/vm/engine/disttae/snapshot_scan.go index 2bea0fb72e27a..7d1d5ce470d36 100644 --- a/pkg/vm/engine/disttae/snapshot_scan.go +++ b/pkg/vm/engine/disttae/snapshot_scan.go @@ -138,7 +138,7 @@ func ScanSnapshotWithCurrentRanges( zap.Int("disk-block-cnt", diskBlockCnt), ) - tombstones, err := tbl.CollectTombstones(ctx, 0, engine.Policy_CollectCommittedTombstones) + tombstones, err := tbl.collectTombstonesAtSnapshot(ctx, 0, engine.Policy_CollectCommittedTombstones, snapshotTS) if err != nil { return err } diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index 51686db1ecefb..212a26f6eeed7 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -543,6 +543,20 @@ func (tbl *txnTable) CollectTombstones( ctx context.Context, txnOffset int, policy engine.TombstoneCollectPolicy, +) (engine.Tombstoner, error) { + return tbl.collectTombstonesAtSnapshot( + ctx, + txnOffset, + policy, + types.TimestampToTS(tbl.db.op.SnapshotTS()), + ) +} + +func (tbl *txnTable) collectTombstonesAtSnapshot( + ctx context.Context, + txnOffset int, + policy engine.TombstoneCollectPolicy, + snapshot types.TS, ) (engine.Tombstoner, error) { tombstone := readutil.NewEmptyTombstoneData() @@ -603,8 +617,7 @@ func (tbl *txnTable) CollectTombstones( return nil, err } { - ts := tbl.db.op.SnapshotTS() - iter := state.NewRowsIter(types.TimestampToTS(ts), nil, true) + iter := state.NewRowsIter(snapshot, nil, true) for iter.Next() { entry := iter.Entry() //bid, o := entry.RowID.Decode() @@ -615,7 +628,6 @@ func (tbl *txnTable) CollectTombstones( //tombstone.SortInMemory() //collect committed persisted tombstones from partition state. - snapshot := types.TimestampToTS(tbl.db.op.Txn().SnapshotTS) err = state.CollectTombstoneObjects(snapshot, func(stats *objectio.ObjectStats) { tombstone.AppendFiles(*stats) From 0968692de2c8db29f380947b90556e900065767f Mon Sep 17 00:00:00 2001 From: gouhongshen Date: Thu, 30 Apr 2026 05:44:11 +0800 Subject: [PATCH 2/4] fix: keep reserved pitr drop-if-exists idempotent --- pkg/frontend/pitr.go | 16 ++++++++++------ pkg/sql/compile/ddl.go | 17 +++++++++++++---- pkg/sql/compile/ddl_test.go | 32 ++++++++++++++++++++++++++++++++ pkg/sql/plan/build_ddl.go | 15 ++++++++------- pkg/sql/plan/build_ddl_test.go | 28 ++++++++++++++++++++++++++++ 5 files changed, 91 insertions(+), 17 deletions(-) diff --git a/pkg/frontend/pitr.go b/pkg/frontend/pitr.go index bf3d849a24407..81b1d6bf22258 100644 --- a/pkg/frontend/pitr.go +++ b/pkg/frontend/pitr.go @@ -781,6 +781,16 @@ func doDropPitr(ctx context.Context, ses *Session, stmt *tree.DropPitr) (err err return err } + // check pitr exists or not + tenantInfo := ses.GetTenantInfo() + if isReservedPitrName(string(stmt.Name)) { + // Reserved PITRs are maintained internally; IF EXISTS keeps cleanup SQL idempotent. + if stmt.IfExists { + return nil + } + return moerr.NewInternalError(ctx, "pitr name is reserved") + } + err = bh.Exec(ctx, "begin;") defer func() { err = finishTxn(ctx, bh, err) @@ -788,12 +798,6 @@ func doDropPitr(ctx context.Context, ses *Session, stmt *tree.DropPitr) (err err if err != nil { return err } - - // check pitr exists or not - tenantInfo := ses.GetTenantInfo() - if isReservedPitrName(string(stmt.Name)) { - return moerr.NewInternalError(ctx, "pitr name is reserved") - } pitrExist, err = checkPitrExistOrNot(ctx, bh, string(stmt.Name), uint64(tenantInfo.GetTenantID())) if err != nil { return err diff --git a/pkg/sql/compile/ddl.go b/pkg/sql/compile/ddl.go index 2d16a7485866c..80d2f528b8450 100644 --- a/pkg/sql/compile/ddl.go +++ b/pkg/sql/compile/ddl.go @@ -1863,6 +1863,12 @@ const ( dataBranchSysMoCatalogPitrName = "sys_mo_catalog_pitr" ) +func isReservedPitrName(pitrName string) bool { + return pitrName == dataBranchSysMoCatalogPitrName || + pitrName == dataBranchInternalPitrNamePrefix || + strings.HasPrefix(pitrName, dataBranchInternalPitrNamePrefix+"_") +} + type dataBranchTablePitrForCleanup struct { accountID uint64 tableID uint64 @@ -4629,10 +4635,13 @@ func (s *Scope) DropPitr(c *Compile) error { if pitrName == "" { return moerr.NewInternalErrorf(c.proc.Ctx, "pitr name is empty") } - if pitrName == "__mo_data_branch_pitr" || strings.HasPrefix(pitrName, "__mo_data_branch_pitr_") { + if isReservedPitrName(pitrName) { + // Reserved PITRs are maintained internally; IF EXISTS keeps cleanup SQL idempotent. + if dropPitr.GetIfExists() { + return nil + } return moerr.NewInternalError(c.proc.Ctx, "pitr name is reserved") } - const sysMoCatalogPitr = "sys_mo_catalog_pitr" const sysAccountId = 0 // Get current account @@ -4663,7 +4672,7 @@ func (s *Scope) DropPitr(c *Compile) error { } // 3. Check if there are other PITR records besides sys_mo_catalog_pitr - checkOtherSql := fmt.Sprintf("SELECT pitr_id FROM mo_catalog.mo_pitr WHERE pitr_name != '%s'", sysMoCatalogPitr) + checkOtherSql := fmt.Sprintf("SELECT pitr_id FROM mo_catalog.mo_pitr WHERE pitr_name != '%s'", dataBranchSysMoCatalogPitrName) otherRes, err := c.runSqlWithResultAndOptions(checkOtherSql, sysAccountId, executor.StatementOption{}.WithDisableLog()) if err != nil { return err @@ -4671,7 +4680,7 @@ func (s *Scope) DropPitr(c *Compile) error { defer otherRes.Close() if len(otherRes.Batches) == 0 || otherRes.Batches[0].RowCount() == 0 { // 4. No other PITR records, delete sys_mo_catalog_pitr - deleteSysSql := fmt.Sprintf("DELETE FROM mo_catalog.mo_pitr WHERE pitr_name = '%s' AND create_account = %d", sysMoCatalogPitr, sysAccountId) + deleteSysSql := fmt.Sprintf("DELETE FROM mo_catalog.mo_pitr WHERE pitr_name = '%s' AND create_account = %d", dataBranchSysMoCatalogPitrName, sysAccountId) err = c.runSqlWithAccountIdAndOptions(deleteSysSql, sysAccountId, executor.StatementOption{}.WithDisableLog()) if err != nil { return err diff --git a/pkg/sql/compile/ddl_test.go b/pkg/sql/compile/ddl_test.go index 9eecf7c2390f3..5d5a9aa4db7df 100644 --- a/pkg/sql/compile/ddl_test.go +++ b/pkg/sql/compile/ddl_test.go @@ -867,6 +867,38 @@ func TestPitrDupError(t *testing.T) { } } +func TestDropPitrReservedName(t *testing.T) { + newDropPitrScope := func(name string, ifExists bool) *Scope { + return &Scope{Plan: &plan2.Plan{Plan: &plan2.Plan_Ddl{Ddl: &plan2.DataDefinition{ + Definition: &plan2.DataDefinition_DropPitr{ + DropPitr: &plan2.DropPitr{ + Name: name, + IfExists: ifExists, + }, + }, + }}}} + } + + compile := &Compile{proc: testutil.NewProc(t)} + for _, name := range []string{ + dataBranchSysMoCatalogPitrName, + dataBranchInternalPitrNamePrefix, + dataBranchInternalPitrNamePrefix + "_table_123", + } { + name := name + t.Run(name+"/without_if_exists", func(t *testing.T) { + err := newDropPitrScope(name, false).DropPitr(compile) + assert.Error(t, err) + assert.Contains(t, err.Error(), "pitr name is reserved") + }) + + t.Run(name+"/with_if_exists", func(t *testing.T) { + err := newDropPitrScope(name, true).DropPitr(compile) + assert.NoError(t, err) + }) + } +} + func TestIsExperimentalEnabled(t *testing.T) { s := newScope(TableClone) diff --git a/pkg/sql/plan/build_ddl.go b/pkg/sql/plan/build_ddl.go index 0cb8a50e455b1..4e3ddeda21edc 100644 --- a/pkg/sql/plan/build_ddl.go +++ b/pkg/sql/plan/build_ddl.go @@ -4956,10 +4956,7 @@ func buildCreatePitr(stmt *tree.CreatePitr, ctx CompilerContext) (*Plan, error) return nil, moerr.NewInternalErrorf(ctx.GetContext(), "invalid pitr unit %s", pitrUnit) } - // check pitr exists or not - if string(stmt.Name) == SYSMOCATALOGPITR || - string(stmt.Name) == DATA_BRANCH_PITR_NAME_PREFIX || - strings.HasPrefix(string(stmt.Name), DATA_BRANCH_PITR_NAME_PREFIX+"_") { + if isReservedPitrName(string(stmt.Name)) { return nil, moerr.NewInternalError(ctx.GetContext(), "pitr name is reserved") } @@ -5033,9 +5030,7 @@ func buildCreatePitr(stmt *tree.CreatePitr, ctx CompilerContext) (*Plan, error) func buildDropPitr(stmt *tree.DropPitr, ctx CompilerContext) (*Plan, error) { ddlType := plan.DataDefinition_DROP_PITR // Remove privilege check, no account ID validation - if string(stmt.Name) == SYSMOCATALOGPITR || - string(stmt.Name) == DATA_BRANCH_PITR_NAME_PREFIX || - strings.HasPrefix(string(stmt.Name), DATA_BRANCH_PITR_NAME_PREFIX+"_") { + if isReservedPitrName(string(stmt.Name)) && !stmt.IfExists { return nil, moerr.NewInternalError(ctx.GetContext(), "pitr name is reserved") } @@ -5057,6 +5052,12 @@ func buildDropPitr(stmt *tree.DropPitr, ctx CompilerContext) (*Plan, error) { }, nil } +func isReservedPitrName(pitrName string) bool { + return pitrName == SYSMOCATALOGPITR || + pitrName == DATA_BRANCH_PITR_NAME_PREFIX || + strings.HasPrefix(pitrName, DATA_BRANCH_PITR_NAME_PREFIX+"_") +} + func buildCreateCDC(stmt *tree.CreateCDC, ctx CompilerContext) (*Plan, error) { accountId, err := ctx.GetAccountId() if err != nil { diff --git a/pkg/sql/plan/build_ddl_test.go b/pkg/sql/plan/build_ddl_test.go index bb43896454c78..a647d5c40bf0f 100644 --- a/pkg/sql/plan/build_ddl_test.go +++ b/pkg/sql/plan/build_ddl_test.go @@ -728,6 +728,34 @@ func TestBuildCreatePitr(t *testing.T) { }) } +func TestBuildDropPitrReservedName(t *testing.T) { + ctx := &MockCompilerContext{} + for _, name := range []string{ + SYSMOCATALOGPITR, + DATA_BRANCH_PITR_NAME_PREFIX, + DATA_BRANCH_PITR_NAME_PREFIX + "_table_123", + } { + name := name + t.Run(name+"/without_if_exists", func(t *testing.T) { + stmt := &tree.DropPitr{Name: tree.Identifier(name)} + _, err := buildDropPitr(stmt, ctx) + assert.Error(t, err) + assert.Contains(t, err.Error(), "pitr name is reserved") + }) + + t.Run(name+"/with_if_exists", func(t *testing.T) { + stmt := &tree.DropPitr{Name: tree.Identifier(name), IfExists: true} + built, err := buildDropPitr(stmt, ctx) + assert.NoError(t, err) + require.NotNil(t, built) + dropPitr := built.GetDdl().GetDropPitr() + require.NotNil(t, dropPitr) + assert.True(t, dropPitr.GetIfExists()) + assert.Equal(t, name, dropPitr.GetName()) + }) + } +} + func TestConstructAddedPartitionDefsErrors(t *testing.T) { ctx := NewEmptyCompilerContext() ctx.SetContext(context.Background()) From a8ca5ac1cba2ac545c19d4b3d6c98fe852325842 Mon Sep 17 00:00:00 2001 From: gouhongshen Date: Thu, 30 Apr 2026 20:36:17 +0800 Subject: [PATCH 3/4] docs: add research citation reference --- README.md | 18 ++++++++++++++++++ README_CN.md | 18 ++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/README.md b/README.md index 753777b083ee3..7bcead6f5fa97 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,7 @@ Contents * [Architecture](#architecture) * [Python SDK](#python-sdk) * [Contributing](#contributing) +* [Reference](#reference) * [License](#license) ## What is MatrixOne? @@ -1091,6 +1092,23 @@ Contributions to MatrixOne are welcome from everyone. +## Reference + +Reference to cite when you use MatrixOne in a research paper: + +```bibtex +@misc{gou2026versioncontrolsystemdata, + title={Version Control System for Data with MatrixOne}, + author={Gou, Hongshen and Tian, Feng and Wang, Long and Deng, Nan and Xu, Peng}, + year={2026}, + eprint={2604.03927}, + archivePrefix={arXiv}, + primaryClass={cs.DB}, + doi={10.48550/arXiv.2604.03927}, + url={https://arxiv.org/abs/2604.03927} +} +``` + ## License MatrixOne is licensed under the [Apache License, Version 2.0](LICENSE). diff --git a/README_CN.md b/README_CN.md index 304ecd87054e4..fbe10f9658410 100644 --- a/README_CN.md +++ b/README_CN.md @@ -57,6 +57,7 @@ * [架构](#architecture) * [Python SDK](#python-sdk) * [参与贡献](#contributing) +* [参考引用](#reference) * [License](#license) ## MatrixOne 是什么? @@ -1085,6 +1086,23 @@ MatrixOne 提供**全面的 Python SDK**,支持数据库操作、向量搜索 欢迎大家对 MatrixOne 的贡献。 请查看[贡献指南](https://docs.matrixorigin.cn/latest/MatrixOne/Contribution-Guide/make-your-first-contribution/)来了解有关提交补丁和完成整个贡献流程的详细信息。 +## 参考引用 + +如果你在研究论文中使用 MatrixOne,请引用: + +```bibtex +@misc{gou2026versioncontrolsystemdata, + title={Version Control System for Data with MatrixOne}, + author={Gou, Hongshen and Tian, Feng and Wang, Long and Deng, Nan and Xu, Peng}, + year={2026}, + eprint={2604.03927}, + archivePrefix={arXiv}, + primaryClass={cs.DB}, + doi={10.48550/arXiv.2604.03927}, + url={https://arxiv.org/abs/2604.03927} +} +``` + ## License [Apache License, Version 2.0](LICENSE)。 From 43d045cb9c866b6245f108c5a5f642313402440d Mon Sep 17 00:00:00 2001 From: gouhongshen Date: Thu, 30 Apr 2026 20:42:26 +0800 Subject: [PATCH 4/4] Revert "docs: add research citation reference" This reverts commit a8ca5ac1cba2ac545c19d4b3d6c98fe852325842. --- README.md | 18 ------------------ README_CN.md | 18 ------------------ 2 files changed, 36 deletions(-) diff --git a/README.md b/README.md index 7bcead6f5fa97..753777b083ee3 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,6 @@ Contents * [Architecture](#architecture) * [Python SDK](#python-sdk) * [Contributing](#contributing) -* [Reference](#reference) * [License](#license) ## What is MatrixOne? @@ -1092,23 +1091,6 @@ Contributions to MatrixOne are welcome from everyone. -## Reference - -Reference to cite when you use MatrixOne in a research paper: - -```bibtex -@misc{gou2026versioncontrolsystemdata, - title={Version Control System for Data with MatrixOne}, - author={Gou, Hongshen and Tian, Feng and Wang, Long and Deng, Nan and Xu, Peng}, - year={2026}, - eprint={2604.03927}, - archivePrefix={arXiv}, - primaryClass={cs.DB}, - doi={10.48550/arXiv.2604.03927}, - url={https://arxiv.org/abs/2604.03927} -} -``` - ## License MatrixOne is licensed under the [Apache License, Version 2.0](LICENSE). diff --git a/README_CN.md b/README_CN.md index fbe10f9658410..304ecd87054e4 100644 --- a/README_CN.md +++ b/README_CN.md @@ -57,7 +57,6 @@ * [架构](#architecture) * [Python SDK](#python-sdk) * [参与贡献](#contributing) -* [参考引用](#reference) * [License](#license) ## MatrixOne 是什么? @@ -1086,23 +1085,6 @@ MatrixOne 提供**全面的 Python SDK**,支持数据库操作、向量搜索 欢迎大家对 MatrixOne 的贡献。 请查看[贡献指南](https://docs.matrixorigin.cn/latest/MatrixOne/Contribution-Guide/make-your-first-contribution/)来了解有关提交补丁和完成整个贡献流程的详细信息。 -## 参考引用 - -如果你在研究论文中使用 MatrixOne,请引用: - -```bibtex -@misc{gou2026versioncontrolsystemdata, - title={Version Control System for Data with MatrixOne}, - author={Gou, Hongshen and Tian, Feng and Wang, Long and Deng, Nan and Xu, Peng}, - year={2026}, - eprint={2604.03927}, - archivePrefix={arXiv}, - primaryClass={cs.DB}, - doi={10.48550/arXiv.2604.03927}, - url={https://arxiv.org/abs/2604.03927} -} -``` - ## License [Apache License, Version 2.0](LICENSE)。