Skip to content

Commit e6cab4c

Browse files
committed
WIP
1 parent d89dcac commit e6cab4c

27 files changed

Lines changed: 599 additions & 161 deletions

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,7 @@ class BeamModulePlugin implements Plugin<Project> {
763763
// [bomupgrader] the BOM version is set by scripts/tools/bomupgrader.py. If update manually, also update
764764
// libraries-bom version on sdks/java/container/license_scripts/dep_urls_java.yaml
765765
google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:26.79.0",
766+
google_cloud_spanner_bom : "com.google.cloud:google-cloud-spanner-bom:6.113.0",
766767
google_cloud_secret_manager : "com.google.cloud:google-cloud-secretmanager", // google_cloud_platform_libraries_bom sets version
767768
google_cloud_spanner : "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version
768769
google_cloud_storage : "com.google.cloud:google-cloud-storage", // google_cloud_platform_libraries_bom sets version

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1718,6 +1718,11 @@ public BoundedWindow window() {
17181718
return currentWindow;
17191719
}
17201720

1721+
@Override
1722+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
1723+
return currentElement.causedByDrain();
1724+
}
1725+
17211726
@Override
17221727
public Object sideInput(String tagId) {
17231728
return sideInput(sideInputMapping.get(tagId));

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
8888
import org.apache.beam.sdk.io.gcp.spanner.changestreams.cache.CacheFactory;
8989
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
90+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
9091
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames;
9192
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn;
9293
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn;
@@ -1746,6 +1747,8 @@ public abstract static class ReadChangeStream
17461747

17471748
abstract String getChangeStreamName();
17481749

1750+
abstract @Nullable List<String> getTvfNameList();
1751+
17491752
abstract @Nullable String getMetadataInstance();
17501753

17511754
abstract @Nullable String getMetadataDatabase();
@@ -1783,6 +1786,8 @@ abstract static class Builder {
17831786

17841787
abstract Builder setChangeStreamName(String changeStreamName);
17851788

1789+
abstract Builder setTvfNameList(List<String> tvfNameList);
1790+
17861791
abstract Builder setMetadataInstance(String metadataInstance);
17871792

17881793
abstract Builder setMetadataDatabase(String metadataDatabase);
@@ -1861,6 +1866,11 @@ public ReadChangeStream withChangeStreamName(String changeStreamName) {
18611866
return toBuilder().setChangeStreamName(changeStreamName).build();
18621867
}
18631868

1869+
/** Specifies the list of TVF names to query and union. */
1870+
public ReadChangeStream withTvfNameList(List<String> tvfNameList) {
1871+
return toBuilder().setTvfNameList(tvfNameList).build();
1872+
}
1873+
18641874
/** Specifies the metadata database. */
18651875
public ReadChangeStream withMetadataInstance(String metadataInstance) {
18661876
return toBuilder().setMetadataInstance(metadataInstance).build();
@@ -2042,6 +2052,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
20422052
getInclusiveEndAt().compareTo(MAX_INCLUSIVE_END_AT) > 0
20432053
? MAX_INCLUSIVE_END_AT
20442054
: getInclusiveEndAt();
2055+
final List<String> tvfNameList = getTvfNameList();
20452056
final MapperFactory mapperFactory = new MapperFactory(changeStreamDatabaseDialect);
20462057
final ChangeStreamMetrics metrics = new ChangeStreamMetrics();
20472058
final RpcPriority rpcPriority = MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH);
@@ -2051,10 +2062,19 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
20512062
isMutableChangeStream(
20522063
spannerAccessor.getDatabaseClient(), changeStreamDatabaseDialect, changeStreamName);
20532064
LOG.info("The change stream {} is mutable: {}", changeStreamName, isMutableChangeStream);
2065+
if (tvfNameList != null && !tvfNameList.isEmpty()) {
2066+
if (!isMutableChangeStream) {
2067+
throw new IllegalArgumentException(
2068+
"tvfNameList is only supported for change streams with MUTABLE_KEY_RANGE mode");
2069+
}
2070+
// TODO: if !per_placement_tvf=true, throw exception.
2071+
checkTvfExistence(spannerAccessor.getDatabaseClient(), tvfNameList);
2072+
}
20542073
final DaoFactory daoFactory =
20552074
new DaoFactory(
20562075
changeStreamSpannerConfig,
20572076
changeStreamName,
2077+
tvfNameList,
20582078
partitionMetadataSpannerConfig,
20592079
partitionMetadataTableNames,
20602080
rpcPriority,
@@ -2754,6 +2774,44 @@ static String resolveSpannerProjectId(SpannerConfig config) {
27542774
: config.getProjectId().get();
27552775
}
27562776

2777+
@VisibleForTesting
2778+
static void checkTvfExistence(DatabaseClient databaseClient, List<String> tvfNameList) {
2779+
if (tvfNameList == null || tvfNameList.isEmpty()) {
2780+
return;
2781+
}
2782+
try (ReadOnlyTransaction tx = databaseClient.readOnlyTransaction()) {
2783+
StringBuilder inClauseBuilder = new StringBuilder();
2784+
for (int i = 0; i < tvfNameList.size(); i++) {
2785+
inClauseBuilder
2786+
.append("'")
2787+
.append(PartitionMetadataDao.escapeTvfName(tvfNameList.get(i)))
2788+
.append("'");
2789+
if (i < tvfNameList.size() - 1) {
2790+
inClauseBuilder.append(", ");
2791+
}
2792+
}
2793+
String inClause = inClauseBuilder.toString();
2794+
// Note routine_type for GSQL is 'TABLE FUNCTION', for PostgreSQL is 'FUNCTION'.
2795+
String query =
2796+
"SELECT routine_name FROM information_schema.routines WHERE routine_type LIKE '%FUNCTION' AND routine_name IN ("
2797+
+ inClause
2798+
+ ")";
2799+
2800+
Statement statement = Statement.newBuilder(query).build();
2801+
ResultSet resultSet = tx.executeQuery(statement);
2802+
java.util.Set<String> foundNames = new java.util.HashSet<>();
2803+
while (resultSet.next()) {
2804+
foundNames.add(resultSet.getString(0));
2805+
}
2806+
for (String tvfName : tvfNameList) {
2807+
if (!foundNames.contains(PartitionMetadataDao.escapeTvfName(tvfName))) {
2808+
throw new IllegalArgumentException(
2809+
"TVF specified: " + tvfName + " is not found in the existing TVF's: " + foundNames);
2810+
}
2811+
}
2812+
}
2813+
}
2814+
27572815
@VisibleForTesting
27582816
static boolean isMutableChangeStream(
27592817
DatabaseClient databaseClient, Dialect dialect, String changeStreamName) {

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.cloud.Timestamp;
2121
import com.google.cloud.spanner.Options.RpcPriority;
2222
import java.util.Collections;
23+
import java.util.List;
2324
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn;
2425
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
2526
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
@@ -64,6 +65,18 @@ public class ChangeStreamsConstants {
6465
/** The sliding window size in seconds for throughput reporting. */
6566
public static final int THROUGHPUT_WINDOW_SECONDS = 10;
6667

68+
/**
69+
* The delimiter used to separate the partition token and the tvf name. Note this string does not
70+
* exist in the partition token itself.
71+
*/
72+
public static final String PARTITION_TOKEN_TVF_NAME_DELIMITER = "#";
73+
74+
/** The default tvf name for a change stream query is the empty {@link String}. */
75+
public static final String DEFAULT_TVF_NAME = "";
76+
77+
/** The default tvf name list to query and union is empty {@link Collections.emptyList()}. */
78+
public static final List<String> DEFAULT_TVF_NAME_LIST = Collections.emptyList();
79+
6780
/**
6881
* We use the following partition token to provide an estimate size of a partition token. A usual
6982
* partition token has around 140 characters.
@@ -85,6 +98,7 @@ public class ChangeStreamsConstants {
8598
.setState(State.CREATED)
8699
.setWatermark(Timestamp.now())
87100
.setCreatedAt(Timestamp.now())
101+
.setTvfName(DEFAULT_TVF_NAME)
88102
.build();
89103

90104
/**

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,13 +155,17 @@ private void processChildPartition(
155155
record.getStartTimestamp(),
156156
partition.getEndTimestamp(),
157157
partition.getHeartbeatMillis(),
158+
partition.getTvfName(),
158159
childPartition);
159160
LOG.debug("[{}] Inserting child partition token {}", partitionToken, childPartitionToken);
160161
final Boolean insertedRow =
161162
partitionMetadataDao
162163
.runInTransaction(
163164
transaction -> {
164-
if (transaction.getPartition(childPartitionToken) == null) {
165+
if (transaction.getPartition(
166+
PartitionMetadataDao.composePartitionTokenWithTvfName(
167+
childPartitionToken, partition.getTvfName()))
168+
== null) {
165169
transaction.insert(row);
166170
return true;
167171
} else {
@@ -188,13 +192,15 @@ private PartitionMetadata toPartitionMetadata(
188192
Timestamp startTimestamp,
189193
Timestamp endTimestamp,
190194
long heartbeatMillis,
195+
String tvfName,
191196
ChildPartition childPartition) {
192197
return PartitionMetadata.newBuilder()
193198
.setPartitionToken(childPartition.getToken())
194199
.setParentTokens(childPartition.getParentTokens())
195200
.setStartTimestamp(startTimestamp)
196201
.setEndTimestamp(endTimestamp)
197202
.setHeartbeatMillis(heartbeatMillis)
203+
.setTvfName(tvfName)
198204
.setState(CREATED)
199205
.setWatermark(startTimestamp)
200206
.build();

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,14 @@ private ProcessContinuation schedulePartitions(
172172
}
173173

174174
private Timestamp updateBatchToScheduled(List<PartitionMetadata> batchPartitions) {
175-
final List<String> batchPartitionTokens =
175+
final List<String> batchComposedPartitionTokens =
176176
batchPartitions.stream()
177-
.map(PartitionMetadata::getPartitionToken)
177+
.map(
178+
partition ->
179+
PartitionMetadataDao.composePartitionTokenWithTvfName(
180+
partition.getPartitionToken(), partition.getTvfName()))
178181
.collect(Collectors.toList());
179-
return dao.updateToScheduled(batchPartitionTokens);
182+
return dao.updateToScheduled(batchComposedPartitionTokens);
180183
}
181184

182185
private void outputBatch(

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/PartitionStartRecordAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ private void processStartPartition(
138138
.setStartTimestamp(record.getStartTimestamp())
139139
.setEndTimestamp(partition.getEndTimestamp())
140140
.setHeartbeatMillis(partition.getHeartbeatMillis())
141+
.setTvfName(partition.getTvfName())
141142
.setState(CREATED)
142143
.setWatermark(record.getStartTimestamp())
143144
.build();
@@ -146,7 +147,10 @@ private void processStartPartition(
146147
partitionMetadataDao
147148
.runInTransaction(
148149
transaction -> {
149-
if (transaction.getPartition(startPartitionToken) == null) {
150+
if (transaction.getPartition(
151+
PartitionMetadataDao.composePartitionTokenWithTvfName(
152+
startPartitionToken, partition.getTvfName()))
153+
== null) {
150154
transaction.insert(row);
151155
return true;
152156
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,14 @@ public ProcessContinuation run(
185185
ManualWatermarkEstimator<Instant> watermarkEstimator,
186186
BundleFinalizer bundleFinalizer) {
187187
final String token = partition.getPartitionToken();
188+
final String tvfName = partition.getTvfName();
188189

189190
// TODO: Potentially we can avoid this fetch, by enriching the runningAt timestamp when the
190191
// ReadChangeStreamPartitionDoFn#processElement is called
191192
final PartitionMetadata updatedPartition =
192-
Optional.ofNullable(partitionMetadataDao.getPartition(token))
193+
Optional.ofNullable(
194+
partitionMetadataDao.getPartition(
195+
PartitionMetadataDao.composePartitionTokenWithTvfName(token, tvfName)))
193196
.map(partitionMetadataMapper::from)
194197
.orElseThrow(
195198
() ->
@@ -223,7 +226,11 @@ public ProcessContinuation run(
223226

224227
try (ChangeStreamResultSet resultSet =
225228
changeStreamDao.changeStreamQuery(
226-
token, startTimestamp, changeStreamQueryEndTimestamp, partition.getHeartbeatMillis())) {
229+
token,
230+
tvfName,
231+
startTimestamp,
232+
changeStreamQueryEndTimestamp,
233+
partition.getHeartbeatMillis())) {
227234

228235
metrics.incQueryCounter();
229236
while (resultSet.next()) {
@@ -298,7 +305,9 @@ public ProcessContinuation run(
298305
LOG.debug("[{}] Continuation present, returning {}", token, maybeContinuation);
299306
bundleFinalizer.afterBundleCommit(
300307
Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
301-
updateWatermarkCallback(token, watermarkEstimator));
308+
updateWatermarkCallback(
309+
PartitionMetadataDao.composePartitionTokenWithTvfName(token, tvfName),
310+
watermarkEstimator));
302311
return maybeContinuation.get();
303312
}
304313
}
@@ -361,25 +370,27 @@ public ProcessContinuation run(
361370
LOG.debug("[{}] Finishing partition", token);
362371
// TODO: This should be performed after the commit succeeds. Since bundle finalizers are not
363372
// guaranteed to be called, this needs to be performed in a subsequent fused stage.
364-
partitionMetadataDao.updateToFinished(token);
373+
partitionMetadataDao.updateToFinished(
374+
PartitionMetadataDao.composePartitionTokenWithTvfName(token, tvfName));
365375
metrics.decActivePartitionReadCounter();
366376
LOG.info("[{}] After attempting to finish the partition", token);
367377
return ProcessContinuation.stop();
368378
}
369379

370380
private BundleFinalizer.Callback updateWatermarkCallback(
371-
String token, WatermarkEstimator<Instant> watermarkEstimator) {
381+
String compositedToken, WatermarkEstimator<Instant> watermarkEstimator) {
372382
return () -> {
373383
final Instant watermark = watermarkEstimator.currentWatermark();
374-
LOG.debug("[{}] Updating current watermark to {}", token, watermark);
384+
LOG.debug("[{}] Updating current watermark to {}", compositedToken, watermark);
375385
try {
376386
partitionMetadataDao.updateWatermark(
377-
token, Timestamp.ofTimeMicroseconds(watermark.getMillis() * 1_000L));
387+
compositedToken, Timestamp.ofTimeMicroseconds(watermark.getMillis() * 1_000L));
378388
} catch (SpannerException e) {
379389
if (e.getErrorCode() == ErrorCode.NOT_FOUND) {
380-
LOG.debug("[{}] Unable to update the current watermark, partition NOT FOUND", token);
390+
LOG.debug(
391+
"[{}] Unable to update the current watermark, partition NOT FOUND", compositedToken);
381392
} else {
382-
LOG.error("[{}] Error updating the current watermark", token, e);
393+
LOG.error("[{}] Error updating the current watermark", compositedToken, e);
383394
}
384395
}
385396
};

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
1919

20+
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_TVF_NAME;
21+
2022
import com.google.cloud.Timestamp;
2123
import com.google.cloud.spanner.DatabaseClient;
2224
import com.google.cloud.spanner.Dialect;
@@ -83,6 +85,7 @@ public class ChangeStreamDao {
8385
*/
8486
public ChangeStreamResultSet changeStreamQuery(
8587
String partitionToken,
88+
String tvfName,
8689
Timestamp startTimestamp,
8790
Timestamp endTimestamp,
8891
long heartbeatMillis) {
@@ -95,10 +98,14 @@ public ChangeStreamResultSet changeStreamQuery(
9598
if (this.isPostgres()) {
9699
// Ensure we have determined whether change stream uses mutable key range
97100
if (this.isMutableChangeStream) {
98-
query =
99-
"SELECT * FROM \"spanner\".\"read_proto_bytes_"
100-
+ changeStreamName
101-
+ "\"($1, $2, $3, $4, null)";
101+
if (tvfName == null || tvfName.equals(DEFAULT_TVF_NAME)) {
102+
query =
103+
"SELECT * FROM \"spanner\".\"read_proto_bytes_"
104+
+ changeStreamName
105+
+ "\"($1, $2, $3, $4, null)";
106+
} else {
107+
query = "SELECT * FROM \"spanner\".\"" + tvfName + "\"($1, $2, $3, $4, null)";
108+
}
102109
} else {
103110
query =
104111
"SELECT * FROM \"spanner\".\"read_json_"
@@ -117,16 +124,29 @@ public ChangeStreamResultSet changeStreamQuery(
117124
.to(heartbeatMillis)
118125
.build();
119126
} else {
120-
query =
121-
"SELECT * FROM READ_"
122-
+ changeStreamName
123-
+ "("
124-
+ " start_timestamp => @startTimestamp,"
125-
+ " end_timestamp => @endTimestamp,"
126-
+ " partition_token => @partitionToken,"
127-
+ " read_options => null,"
128-
+ " heartbeat_milliseconds => @heartbeatMillis"
129-
+ ")";
127+
if (this.isMutableChangeStream && tvfName != null && !tvfName.equals(DEFAULT_TVF_NAME)) {
128+
query =
129+
"SELECT * FROM "
130+
+ tvfName
131+
+ "("
132+
+ " start_timestamp => @startTimestamp,"
133+
+ " end_timestamp => @endTimestamp,"
134+
+ " partition_token => @partitionToken,"
135+
+ " read_options => null,"
136+
+ " heartbeat_milliseconds => @heartbeatMillis"
137+
+ ")";
138+
} else {
139+
query =
140+
"SELECT * FROM READ_"
141+
+ changeStreamName
142+
+ "("
143+
+ " start_timestamp => @startTimestamp,"
144+
+ " end_timestamp => @endTimestamp,"
145+
+ " partition_token => @partitionToken,"
146+
+ " read_options => null,"
147+
+ " heartbeat_milliseconds => @heartbeatMillis"
148+
+ ")";
149+
}
130150
statement =
131151
Statement.newBuilder(query)
132152
.bind("startTimestamp")

0 commit comments

Comments
 (0)