Cross-Cluster Physical Replication#23532
Conversation
The nil check at line 992 is redundant because line 958 already returns an error when upstreamSQLHelperFactory is nil. The govet nilness analyzer flags this as a tautological condition. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Implements Cross-Cluster Physical Replication (CCPR) support by adding daemon-task bootstrap/upgrade entries, CCPR catalog tables, and frontend “publication internal commands” to fetch snapshot metadata, DDL, object lists, and object chunks for replication workflows.
Changes:
- Add CCPR configuration + global config plumbing and extensive publication package coverage tests.
- Add frontend internal command parsing/execution paths for snapshot TS / databases / DDL / object list / object chunk retrieval, plus auth gating.
- Add bootstrap/upgrade + predefined SQL and new mo_catalog tables for CCPR state tracking.
Reviewed changes
Copilot reviewed 53 out of 199 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/publication/coverage_extra_test.go | Adds coverage tests for publication executor/iteration/DDL/object filtering error paths. |
| pkg/publication/coverage_exec_test.go | Adds mock-based coverage for SQL executor retry/scan and conn parsing. |
| pkg/publication/coverage_ddl_test.go | Adds DDL diff and upstream DDL query coverage tests. |
| pkg/publication/config_test.go | Adds tests for CCPR config defaults/validation/global setters. |
| pkg/publication/config.go | Introduces CCPRConfig + global config accessors/constants for chunk sizing. |
| pkg/predefine/predefine.go | Adds SQL generators for publication daemon task init/check. |
| pkg/objectio/ioutil/writer.go | Clarifies SchemaData usage for ZM updates in BlockWriter. |
| pkg/objectio/injects_coverage_test.go | Adds tests for new fault-injection helpers. |
| pkg/objectio/injects.go | Adds fault points for publication snapshot finished + upstream SQL helper. |
| pkg/frontend/util_test.go | Adds tests for new internal command parsers. |
| pkg/frontend/util.go | Adds parsing/building helpers for publication internal commands. |
| pkg/frontend/types_test.go | Adds unit tests covering new internal command statement types and allocator behavior. |
| pkg/frontend/types.go | Adds new internal command statement types, fprints, and allocator safety tweaks. |
| pkg/frontend/txn_test.go | Extends test workspace with CCPR-related txn methods. |
| pkg/frontend/test/txn_mock.go | Updates gomock workspace to include CCPR-related methods. |
| pkg/frontend/test/engine_mock.go | Extends relation mock with CollectObjectList/GetFlushTS APIs. |
| pkg/frontend/stmt_kind.go | Allows new internal commands in uncommitted transaction context. |
| pkg/frontend/snapshot.go | Adds publication-based snapshot create/drop handling and publication permission resolution helpers. |
| pkg/frontend/self_handle.go | Routes new internal commands and CCPR subscription statements through frontend executor. |
| pkg/frontend/publication_internal_auth_test.go | Adds tests for internal-command auth gating. |
| pkg/frontend/publication_internal_auth.go | Adds sys-tenant-only access gate for publication internal commands. |
| pkg/frontend/predefined.go | Adds mo_ccpr_* table DDL definitions. |
| pkg/frontend/object_list.go | Implements OBJECTLIST handling + internal object list command and engine-based object list collection. |
| pkg/frontend/mysql_cmd_executor_test.go | Adds tests ensuring internal commands are parsed into the right AST nodes. |
| pkg/frontend/mysql_cmd_executor.go | Wires internal command parsing and CCPR subscription/show handlers. |
| pkg/frontend/get_object.go | Implements GET OBJECT + internal getobject with chunking, pooling, and semaphore limiting. |
| pkg/frontend/get_ddl_test.go | Adds tests for DDL batch generation and snapshot covered-scope logic. |
| pkg/frontend/get_ddl.go | Adds DDL batch compute APIs and publication permission helpers. |
| pkg/frontend/check_snapshot_flushed.go | Adds CHECK SNAPSHOT FLUSHED handler + exported helpers and CCPR flush checking. |
| pkg/frontend/back_exec.go | Adds internal command parsing support for background execution. |
| pkg/frontend/authenticate.go | Adds mo_ccpr_* tables to system tables/DDL list and drop-account cleanup; updates privilege mapping. |
| pkg/fileservice/minio_sdk_test.go | Improves MinIO test reliability/cleanup and bounded retries. |
| pkg/defines/type.go | Adds SkipTransferKey marker type for CCPR tombstone handling. |
| pkg/common/moerr/sync_protection_error_test.go | Adds tests for new sync-protection validation error classification. |
| pkg/common/moerr/error_no_ctx.go | Adds NoCtx constructors for sync-protection-related errors. |
| pkg/common/moerr/error.go | Adds new sync-protection error codes/messages + ErrCCPRReadOnly and helper predicates. |
| pkg/cnservice/server_task.go | Registers publication task executor in CN service. |
| pkg/cdc/util.go | Exposes AesCFBEncodeWithKey wrapper. |
| pkg/catalog/types.go | Adds PropFromPublication and defines CCPR catalog table names + schemas. |
| pkg/bootstrap/versions/v4_0_0/cluster_upgrade_list.go | Adds upgrade entries for CCPR tables and publication daemon task. |
| pkg/bootstrap/service.go | Adds publication daemon task init SQL to bootstrap. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
… tables Remove mo_ccpr_log, mo_ccpr_tables, mo_ccpr_dbs from sysWantedTables and predefinedTables. This makes isClusterTable() return true for them, converting them from per-account tables to cluster tables (one per cluster, with data filtered by account_id). The tables remain in createSqls (for sys account bootstrap) and in isSysOnlyDb (to skip during non-sys tenant creation). The cluster upgrade entries in cluster_upgrade_list.go are also preserved. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- config.go: fix GetGlobalConfig race by replacing sync.Once with double-checked locking - config.go: clamp GetChunkMaxConcurrent to minimum 1 - get_object.go: fix misleading comments, add explicit int64->int conversions - object_list.go: return error on snapshot resolution failure instead of silent fallback - snapshot.go: use escapeSQLString for SQL injection prevention - check_snapshot_flushed.go: use escapeSQLString for SQL injection prevention - get_ddl.go: fix batch column names to match MySQL result set names Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Rename 26 test files to use descriptive names based on what they actually test, removing the 'coverage' suffix/prefix that implies they exist solely for coverage purposes. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
escapeSQLString wraps strings with quotes, breaking SQL format strings that already have surrounding quotes. Use strings.ReplaceAll for inline quote escaping instead, consistent with the rest of the codebase. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
XuPeng-SH
left a comment
There was a problem hiding this comment.
🔍 Cross-Cluster Physical Replication (CCPR) — Multi-Angle Code Review
This is a deep review of the ~70K-line CCPR feature across security, correctness, concurrency, observability, and testing dimensions.
🔴 CRITICAL — Must Fix Before Merge
C1. SQL Injection — publication_subscription.go (Pervasive)
Dozens of SQL statements use fmt.Sprintf with unescaped user input. This contrasts with sql_builder.go which properly uses escapeSQLString().
Affected functions include:
getPubInfo()—pubNameunescaped in WHEREgetSqlForUpdatePubInfo()—dbName,comment,pubName,accountListall unescapedinsertMoSubs()—SubAccountName,PubAccountName,PubNameunescapedbatchUpdateMoSubs()—pubDbName,pubTables,pubComment,pubAccountName,pubNamegetPubInfosByDbname(),checkColExists(),getPubInfos(),doShowSubscriptions()checkUpstreamPublicationCoverage()—pubNamein SHOW command
Risk: Tenant names are user-controlled. A malicious tenant name like x' OR '1'='1 could bypass authorization or exfiltrate data.
Fix: Use escapeSQLString() consistently or switch to parameterized queries.
C2. SQL Injection — object_list.go:ResolveSnapshotWithSnapshotNameWithoutSession
sql := fmt.Sprintf(`select ts from mo_catalog.mo_snapshots where sname = '%s' ...`, snapshotName)Snapshot name is directly interpolated without escaping.
C3. SQL Injection — sql_builder.go Snapshot Name Backtick Escape
PublicationCreateCcprSnapshotForAccountSqlTemplate = "CREATE SNAPSHOT IF NOT EXISTS `%s` FOR ACCOUNT..."snapshotName is backtick-quoted but backticks within the name are not escaped. A snapshot name containing ` breaks out of identifier quoting.
C4. Race Condition — executor.go: run() vs Stop() on exec.worker
run() calls exec.worker.Submit() without lock, while Stop() sets exec.worker = nil under runningMu. Concurrent Stop() during submission causes nil pointer panic.
C5. Unrecoverable Panic — executor.go:ExecWithResult
v, ok := moruntime.ServiceRuntime(cnUUID).GetGlobalVariables(moruntime.InternalSQLExecutor)
if !ok {
panic("missing internal sql executor") // crashes entire executor goroutine
}Called from run() with no panic recovery. Should return error, not panic.
C6. Goroutine Self-Deadlock — executor.go:run()
go exec.Stop() // spawns goroutine that calls exec.wg.Wait()
break // run() hasn't called wg.Done() yet!Stop() waits for wg.Wait(), but run() is the goroutine that calls wg.Done(). Deadlock.
C7. OnTxnCommit Called Before Actual Commit — Data Leak
In txn.go, OnTxnCommit is called at the top of Commit(), before mergeTxnWorkspaceLocked/dumpBatchLocked/TN precommit. If commit subsequently fails, the cache entry is already deleted and objects become unprotected. Rollback won't find them either → orphaned S3 objects never cleaned up.
Fix: Move OnTxnCommit to after commit succeeds, or into a post-commit callback.
C8. BTree Iterator Leak — logtailreplay/object_list.go
CollectObjectList and CollectSnapshotObjectList call state.dataObjectsNameIndex.Iter() / state.tombstoneObjectsNameIndex.Iter() but never call iter.Release(). This leaks BTree read-locks and can block writers indefinitely. Compare txn_table.go:266 which correctly does defer func() { _ = iter.Close() }().
C9. ~30 Prometheus Metrics Defined But Never Registered
In pkg/util/metric/v2/ccpr.go, approximately 30 memory/pool metrics are defined (gauges, counters, histograms) but initCCPRMetrics() does not register them with the Prometheus registry. These metrics are silently invisible in production — no monitoring, no alerting.
🟠 HIGH — Should Fix
H1. AObjectMap Race Condition
AObjectMap is a Go map mutated in ApplyObjects() and read during tombstone rowid rewriting — without synchronization. Concurrent access to Go maps is a data race (crashes or silent corruption).
H2. Tombstone Rowid Rewriting Silently Skips Unmapped Objects
In filter_object.go, if aobjectMap doesn't have a mapping for a deleted upstream appendable object, the tombstone's rowid is left pointing to the upstream object UUID — which doesn't exist on downstream. Silent data inconsistency: deleted rows may still appear on replica.
H3. Partial State Commits Without Idempotency
In ExecuteIteration, object application and state persistence are not atomic. If the process crashes between applying objects and persisting iterationState, the next restart replays the same objects. Without idempotency keys, this can cause duplicate data or failures.
H4. Resource Leaks in ddl.go Error Paths
Multiple ExecSQLInDatabase() calls don't call result.Close() / cancel() on error:
result, cancel, err := iterationCtx.LocalExecutor.ExecSQLInDatabase(...)
if err != nil {
return err // result/cancel leaked!
}H5. MemoryController.Free() Semaphore Mismatch
func (mc *MemoryController) Free(data []byte, memType MemoryType) {
if size > 10*1024*1024 {
select {
case <-mc.largeSem:
default: // silently skip!
}
}
}The Alloc checks size > 10MB at alloc time, but Free checks len(data) which could differ due to mpool growth. Also, if Alloc was context-cancelled before acquiring the semaphore, Free still tries to release it. Over time, semaphore may leak slots.
H6. gcObjectsAsync is Synchronous
ccpr_txn_cache.go:gcObjectsAsync() — Despite the name and the allocated gcPool *ants.Pool, the function calls fs.Delete() synchronously. The gcPool is never used. GC file deletions block OnTxnRollback.
H7. Connection String @ Split Breaks Passwords
parts := strings.Split(connStr, "@")
if len(parts) != 2 { return error }strings.Split by @ will break if password contains @. Should use strings.LastIndex("@") or URL-encode the password.
H8. Missing Panic Recovery in run()
The main run() goroutine in executor.go has no defer func() { recover() }(). Any panic kills the replication loop permanently.
H9. Ticker Leak in executor.go:run()
syncTaskTrigger := time.NewTicker(exec.option.SyncTaskInterval)
gcTrigger := time.NewTicker(exec.option.GCInterval)
// Never stopped!Missing defer syncTaskTrigger.Stop() / defer gcTrigger.Stop().
H10. context.Background() Passed to run() — Dead Code
go exec.run(context.Background())Inside run(), ctx.Done() is checked but will never fire since it's context.Background(). The first select case is dead code.
H11. Transaction Committed After Failed Init
In executor.go:applyCcprLog:
txnOp, err := exec.cnTxnClient.New(ctx, nowTs, createByOpt)
if txnOp != nil {
defer txnOp.Commit(ctx) // commit set before error check!
}
err = exec.txnEngine.New(ctx, txnOp) // if this fails, still commitsH12. Silent Error Swallowing in Cleanup Paths
In iteration.go, when UpdateIterationState fails after a successful object application, the original error context is lost. The function logs a warning and continues, leaving inconsistent state.
🟡 MEDIUM
M1. DDL Drop-Create Without Atomicity
ALTER operations do dropTable() then createTable(). If create fails after drop succeeds, the table is permanently gone on downstream.
M2. Inconsistent SQL Escaping Across Files
sql_builder.go uses escapeSQLString(). publication_subscription.go uses strings.ReplaceAll(x, "'", "''") in some places, nothing in others. object_list.go uses nothing. Should standardize.
M3. DefaultClassifier.IsRetryable() Too Broad
Pattern matching on substrings like "timeout", "backend", "unavailable" can match non-transient errors (e.g., "this feature is unavailable").
M4. Bloom Filter False Positives Accumulate Without Secondary Check
IsProtected() returns true on bloom filter match. With DefaultMaxSyncProtections = 1000000, false positive rates compound across iterations. No secondary exact check exists. This may cause significant storage bloat over time.
M5. fileName Field Overloaded for Soft-Delete Encoding
fileName: fmt.Sprintf("soft_delete_object:%v", isTombstone)Uses fileName as semantic carrier. Code checking len(e.fileName) != 0 to detect S3 objects will misidentify soft-delete entries.
M6. Zero Unit Tests for UpstreamSQLHelper
1,561 lines, 25 methods, ~110 error paths — all untested. This is the most security-sensitive component (upstream SQL execution).
M7. No Replication Lag Metric
Critical for operations — no way to monitor how far downstream is behind upstream.
M8. Hardcoded /tmp/test_apply_objects in Tests
Parallel test runs will conflict. Use t.TempDir().
M9. vector.AppendBytes/AppendFixed Errors Silently Discarded
In logtailreplay/object_list.go, all vector.Append* calls in fillInObjectListFn discard errors. If mpool allocation fails, batches have inconsistent vector lengths → downstream panics.
🔵 LOW
- Object pool reset fragility —
Release*functions clear known fields but are brittle if struct gains new fields globalMemoryControllerOnce+ Mutex double pattern —SetGlobalMemoryControllerbypassessync.OncecheckLeaseReadRows callback always returns false on first call — correct but fragile- Table-level merge exclusion —
IsFromPublication()skips merge for entire table, not just replicated objects - 20-min TTL vs ~1.5hr sync jobs — aggressive cleanup risk during network partitions
Summary
| Severity | Count | Top Concerns |
|---|---|---|
| 🔴 CRITICAL | 9 | SQL injection (3 vectors), executor deadlock/panic (3), commit ordering data leak, iterator leak, metrics dead |
| 🟠 HIGH | 12 | Map race, tombstone silent corruption, partial state, resource leaks, semaphore, connection parsing |
| 🟡 MEDIUM | 9 | DDL atomicity, escaping inconsistency, no tests for SQL helper, no lag metric |
| 🔵 LOW | 5 | Pool reset, merge scope, TTL |
Top 3 Recommendations:
- Security audit all SQL in
publication_subscription.goandobject_list.go— standardize onescapeSQLString()or parameterized queries - Fix executor lifecycle — panic recovery, deadlock in
Stop(), race onworker - Fix commit ordering —
OnTxnCommitmust execute after commit succeeds to prevent orphaned S3 objects
- C1-C3: Fix SQL injection in publication_subscription.go, object_list.go, sql_builder.go - C4-C6: Fix executor lifecycle (race condition, panic, deadlock) - C7: Move OnTxnCommit after actual commit succeeds - C8: Add iter.Release() for BTree iterators - C9: Register missing CCPR Prometheus metrics - H1: Add sync.RWMutex to AObjectMap - H2: Log warning for unmapped tombstone objects - H4: Fix resource leaks in ddl.go error paths - H5: Fix semaphore mismatch in MemoryController - H6: Rename gcObjectsAsync to gcObjects - H7: Fix connection string @ split - H8-H11: Panic recovery, ticker leak, context, commit semantics - H12: Propagate UpdateIterationState errors - M1-M9: Various medium-priority improvements - LOW: Pool reset, globalMemoryController, misc cleanup Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Fix Test_insertCCPRDbAndTableRecords_EscapesStrings: use sanitizeSQLInput instead of escapeSQLString (which wraps with quotes) - Fix TestParseUpstreamConn/missing_@_separator: update expected error message - Fix TestGetChunkJob retryable tests: add 'backend unavailable' to retryable patterns (was broken when narrowing from broad 'backend'/'unavailable') Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
# Conflicts: # pkg/sql/parsers/dialect/mysql/mysql_sql.go # pkg/sql/parsers/tree/data_branch.go # pkg/sql/parsers/tree/data_branch_test.go
1. data_branch_test.go: Add missing closing brace for TestObjectListFormat_GoodPath before TestDataBranchPickLifecycle, fixing syntax error. 2. executor.go: Remove unsafe recover() in PublicationTaskExecutor.run() which is not in molint whitelist, and remove unused runtime/debug import. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1. ddl.go: Fix canDoColumnChangesInplace to prioritize name-based matching over position-based matching, preventing false positives when columns swap positions but keep the same type. 2. types.go: Remove unused isSoftDeleteEntry in disttae (already defined and used in tae/rpc/handle.go), and remove now-unused strings import. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
# Conflicts: # pkg/sql/parsers/dialect/mysql/mysql_sql.go
- Fix object_list.go: return explicit errors when snapshot resolution fails instead of silently falling back to MinTs()/getCurrentTS() - Regenerate mysql_sql.go from merged grammar file - Resolve merge conflicts Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…tream_sql_helper The UpstreamSQLHelper was checking for "_" (underscore) to skip setting AgainstSnapshot, but the protocol convention uses "-" (dash) as the sentinel value for empty against snapshot. This caused the snapshot name "-" to be treated as a real snapshot name, failing resolution. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Merge Queue Status
This pull request spent 18 seconds in the queue, including 1 second running CI. Required conditions to merge
|
What type of PR is this?
Which issue(s) this PR fixes:
issue #23525
What this PR does / why we need it:
https://github.com/jiangxinmeng1/matrixone/blob/s3_sync/pkg/cdc/S3_SYNC_OVERVIEW.md