Skip to content

Commit 00542f5

Browse files
committed
fix: ctid parameter serialization failure in batched sync (v7.0.11)
1 parent 026c3b1 commit 00542f5

3 files changed

Lines changed: 61 additions & 6 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
All notable changes to this project will be documented in this file.
44

5+
## [7.0.11] - 2025-12-10
6+
7+
### Fixed
8+
9+
- **Critical: ctid parameter serialization failure in batched sync**: Fixed bug where batched xmin sync would fail after the first batch with "cannot convert between Rust type `&String` and Postgres type `tid`". The ctid value was being passed as a parameterized query argument, but tokio-postgres cannot serialize Rust strings to PostgreSQL's tid type. Now inlines the ctid value in the query (with format validation to prevent SQL injection).
10+
511
## [7.0.10] - 2025-12-10
612

713
### Fixed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "database-replicator"
3-
version = "7.0.10"
3+
version = "7.0.11"
44
edition = "2021"
55
license = "Apache-2.0"
66
description = "Universal database-to-PostgreSQL replication CLI. Supports PostgreSQL, SQLite, MongoDB, and MySQL."

src/xmin/reader.rs

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,27 @@ pub fn detect_wraparound(old_xmin: u32, current_xmin: u32) -> WraparoundCheck {
4747
}
4848
}
4949

50+
/// Validate that a ctid string has the correct format "(page,tuple)".
51+
///
52+
/// ctid is a PostgreSQL system column representing the physical location of a row.
53+
/// Format is "(page_number,tuple_index)" where both are non-negative integers.
54+
/// Examples: "(0,1)", "(123,45)", "(0,100)"
55+
///
56+
/// We validate before inlining in SQL to prevent injection attacks.
57+
fn is_valid_ctid(s: &str) -> bool {
58+
let s = s.trim();
59+
if !s.starts_with('(') || !s.ends_with(')') {
60+
return false;
61+
}
62+
let inner = &s[1..s.len() - 1];
63+
let parts: Vec<&str> = inner.split(',').collect();
64+
if parts.len() != 2 {
65+
return false;
66+
}
67+
// Both parts must be valid unsigned integers
68+
parts[0].trim().parse::<u64>().is_ok() && parts[1].trim().parse::<u32>().is_ok()
69+
}
70+
5071
/// Reads changed rows from a PostgreSQL table using xmin-based change detection.
5172
///
5273
/// PostgreSQL's `xmin` system column contains the transaction ID that last modified
@@ -200,14 +221,21 @@ impl<'a> XminReader<'a> {
200221
// Use (xmin, ctid) as compound pagination key to handle duplicate xmin values.
201222
// ctid is the physical tuple location and provides a stable tie-breaker.
202223
let (query, rows) = if let Some(ref last_ctid) = batch_reader.last_ctid {
203-
// Subsequent batches: use compound (xmin, ctid) > ($1, $2) filter
224+
// Validate ctid format for safety before inlining in query.
225+
// ctid format is "(page,tuple)" e.g., "(0,1)" or "(123,45)"
226+
if !is_valid_ctid(last_ctid) {
227+
anyhow::bail!("Invalid ctid format: {}", last_ctid);
228+
}
229+
230+
// Subsequent batches: use compound (xmin, ctid) > ($1, 'ctid'::tid) filter
231+
// Note: ctid must be inlined because tokio-postgres can't serialize String to tid type
204232
let query = format!(
205233
"SELECT {}, xmin::text::bigint as _xmin, ctid::text as _ctid \
206234
FROM \"{}\".\"{}\" \
207-
WHERE (xmin::text::bigint, ctid) > ($1, $2::tid) \
235+
WHERE (xmin::text::bigint, ctid) > ($1, '{}'::tid) \
208236
ORDER BY xmin::text::bigint, ctid \
209-
LIMIT $3",
210-
column_list, batch_reader.schema, batch_reader.table
237+
LIMIT $2",
238+
column_list, batch_reader.schema, batch_reader.table, last_ctid
211239
);
212240

213241
let rows = self
@@ -216,7 +244,6 @@ impl<'a> XminReader<'a> {
216244
&query,
217245
&[
218246
&(batch_reader.current_xmin as i64),
219-
&last_ctid,
220247
&(batch_reader.batch_size as i64),
221248
],
222249
)
@@ -591,4 +618,26 @@ mod tests {
591618
WraparoundCheck::WraparoundDetected
592619
);
593620
}
621+
622+
#[test]
623+
fn test_is_valid_ctid() {
624+
// Valid ctid formats
625+
assert!(is_valid_ctid("(0,1)"));
626+
assert!(is_valid_ctid("(123,45)"));
627+
assert!(is_valid_ctid("(0,100)"));
628+
assert!(is_valid_ctid("(999999,65535)"));
629+
assert!(is_valid_ctid(" (0,1) ")); // Whitespace trimmed
630+
631+
// Invalid formats
632+
assert!(!is_valid_ctid("0,1")); // Missing parentheses
633+
assert!(!is_valid_ctid("(0,1")); // Missing closing paren
634+
assert!(!is_valid_ctid("0,1)")); // Missing opening paren
635+
assert!(!is_valid_ctid("(0)")); // Missing tuple index
636+
assert!(!is_valid_ctid("(0,1,2)")); // Too many parts
637+
assert!(!is_valid_ctid("(a,1)")); // Non-numeric page
638+
assert!(!is_valid_ctid("(0,b)")); // Non-numeric tuple
639+
assert!(!is_valid_ctid("")); // Empty string
640+
assert!(!is_valid_ctid("()")); // Empty parens
641+
assert!(!is_valid_ctid("(-1,1)")); // Negative page (parses as invalid)
642+
}
594643
}

0 commit comments

Comments
 (0)