Skip to content

Commit 40e715f

Browse files
committed
feat(spanner): add support for new cloud client test framework in google-cloud-spanner-executor
1 parent 4477288 commit 40e715f

11 files changed

Lines changed: 1213 additions & 5 deletions

File tree

.github/workflows/system-tests-against-emulator.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,4 @@
3131
# working-directory: handwritten/spanner
3232
# env:
3333
# SPANNER_EMULATOR_HOST: localhost:9010
34-
# GCLOUD_PROJECT: emulator-test-project
34+
# GCLOUD_PROJECT: emulator-test-project
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
/*!
2+
* Copyright 2026 Google LLC. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import {ServerDuplexStream, status} from '@grpc/grpc-js';
18+
import {Spanner} from '../../src';
19+
import {trace, context, Tracer} from '@opentelemetry/api';
20+
import * as protos from '../../protos/protos';
21+
import {CloudUtil} from './cloud-util';
22+
import {OutcomeSender, ExecutionFlowContextInterface} from './cloud-executor';
23+
import spanner = protos.google.spanner;
24+
import SpannerAsyncActionRequest = spanner.executor.v1.SpannerAsyncActionRequest;
25+
import SpannerAsyncActionResponse = spanner.executor.v1.SpannerAsyncActionResponse;
26+
import ISpannerAction = spanner.executor.v1.ISpannerAction;
27+
import IAdminAction = spanner.executor.v1.IAdminAction;
28+
import ICreateCloudInstanceAction = spanner.executor.v1.ICreateCloudInstanceAction;
29+
30+
/**
31+
* Context for a single stream connection.
32+
*/
33+
export class ExecutionFlowContext implements ExecutionFlowContextInterface {
34+
private call: ServerDuplexStream<
35+
SpannerAsyncActionRequest,
36+
SpannerAsyncActionResponse
37+
>;
38+
39+
constructor(
40+
call: ServerDuplexStream<
41+
SpannerAsyncActionRequest,
42+
SpannerAsyncActionResponse
43+
>,
44+
) {
45+
this.call = call;
46+
}
47+
48+
/**
49+
* Sends a response back to the client.
50+
*/
51+
public onNext(response: SpannerAsyncActionResponse): void {
52+
53+
// Prevent writing if client cancelled the call, or the underlying Node stream is un-writable/destroyed
54+
if (this.call.cancelled || this.call.destroyed || this.call.writable === false) {
55+
console.warn('Attempted to write to a closed or cancelled stream.');
56+
return;
57+
}
58+
59+
this.call.write(response);
60+
}
61+
62+
/**
63+
* Sends an error back to the client.
64+
*/
65+
public onError(error: Error): void {
66+
const stream = this.call as any;
67+
68+
if (this.call.cancelled || stream.destroyed || stream.writable === false) {
69+
console.warn(
70+
'Attempted to emit error to a closed or cancelled stream.',
71+
error,
72+
);
73+
return;
74+
}
75+
76+
this.call.emit('error', error);
77+
}
78+
79+
/**
80+
* Clean up resources associated with the context.
81+
*/
82+
public cleanup(): void {
83+
console.log('Cleaning up ExecutionFlowContext');
84+
}
85+
}
86+
87+
export class CloudClientExecutor {
88+
private spanner: Spanner;
89+
private tracer: Tracer;
90+
91+
constructor() {
92+
const spannerOptions = CloudUtil.getSpannerOptions();
93+
this.spanner = new Spanner(spannerOptions);
94+
this.tracer = trace.getTracer(CloudClientExecutor.name);
95+
}
96+
97+
/**
98+
* Creates a new ExecutionFlowContext for a stream.
99+
*/
100+
public createExecutionFlowContext(
101+
call: ServerDuplexStream<
102+
SpannerAsyncActionRequest,
103+
SpannerAsyncActionResponse
104+
>,
105+
): ExecutionFlowContext {
106+
return new ExecutionFlowContext(call);
107+
}
108+
109+
/**
110+
* Starts handling a SpannerAsyncActionRequest.
111+
*/
112+
public startHandlingRequest(
113+
req: SpannerAsyncActionRequest,
114+
executionContext: ExecutionFlowContext,
115+
): {code: number; details: string} {
116+
const outcomeSender = new OutcomeSender(req.actionId!, executionContext);
117+
118+
if (!req.action) {
119+
return outcomeSender.finishWithError({
120+
code: status.INVALID_ARGUMENT,
121+
message: 'Invalid request: No action present',
122+
});
123+
}
124+
this.executeAction(outcomeSender, req.action).catch(err => {
125+
console.error('Unhandled exception in action execution:', err);
126+
outcomeSender.finishWithError(err);
127+
});
128+
129+
return {code: status.OK, details: ''};
130+
}
131+
132+
/**
133+
* Determines the specific Spanner action type and routes it to the appropriate handler.
134+
*/
135+
private async executeAction(
136+
outcomeSender: OutcomeSender,
137+
action: ISpannerAction,
138+
): Promise<void> {
139+
const actionType =
140+
Object.keys(action).find(
141+
k => action[k as keyof typeof action] !== undefined,
142+
) || 'unknown';
143+
const span = this.tracer.startSpan(`performaction_${actionType}`);
144+
145+
return context.with(trace.setSpan(context.active(), span), async () => {
146+
try {
147+
if (action.admin) {
148+
await this.executeAdminAction(action.admin, outcomeSender);
149+
return;
150+
}
151+
152+
outcomeSender.finishWithError({
153+
code: status.UNIMPLEMENTED,
154+
message: `Action ${actionType} not implemented yet`,
155+
});
156+
} catch (e: any) {
157+
span.recordException(e);
158+
console.error('Unexpected error:', e);
159+
outcomeSender.finishWithError({
160+
code: status.INVALID_ARGUMENT,
161+
message: `Unexpected error: ${e.message}`,
162+
});
163+
} finally {
164+
span.end();
165+
}
166+
});
167+
}
168+
169+
private async executeAdminAction(
170+
action: IAdminAction,
171+
sender: OutcomeSender,
172+
): Promise<void> {
173+
try {
174+
if (action.createCloudInstance) {
175+
await this.executeCreateCloudInstance(
176+
action.createCloudInstance,
177+
sender,
178+
);
179+
return;
180+
}
181+
sender.finishWithError({
182+
code: status.UNIMPLEMENTED,
183+
message: 'Admin action not implemented',
184+
});
185+
} catch (e: any) {
186+
sender.finishWithError(e);
187+
}
188+
}
189+
190+
private async executeCreateCloudInstance(
191+
action: ICreateCloudInstanceAction,
192+
sender: OutcomeSender,
193+
): Promise<void> {
194+
try {
195+
console.log(`Creating instance: \n${JSON.stringify(action, null, 2)}`);
196+
197+
const instanceId = action.instanceId!;
198+
const projectId = action.projectId!;
199+
const configId = action.instanceConfigId!;
200+
201+
const instanceAdminClient = this.spanner.getInstanceAdminClient();
202+
203+
const [operation] = await instanceAdminClient.createInstance({
204+
parent: instanceAdminClient.projectPath(projectId),
205+
instanceId: instanceId,
206+
instance: {
207+
config: instanceAdminClient.instanceConfigPath(projectId, configId),
208+
displayName: instanceId,
209+
nodeCount: action.nodeCount || 1,
210+
processingUnits: action.processingUnits,
211+
labels: action.labels || {},
212+
},
213+
});
214+
215+
console.log('Waiting for instance creation operation to complete...');
216+
217+
await operation.promise();
218+
219+
console.log(`Instance ${instanceId} created successfully.`);
220+
221+
sender.finishWithOK();
222+
} catch (err: any) {
223+
if (err.code === status.ALREADY_EXISTS) {
224+
console.log('Instance already exists, returning OK.');
225+
sender.finishWithOK();
226+
return;
227+
}
228+
console.error('Failed to create instance:', err);
229+
sender.finishWithError(err);
230+
}
231+
}
232+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*!
2+
* Copyright 2026 Google LLC. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import {ServerDuplexStream, status, ServiceError} from '@grpc/grpc-js';
18+
import {trace, context, Tracer} from '@opentelemetry/api';
19+
import {CloudClientExecutor} from './cloud-client-executor';
20+
import * as protos from '../../protos/protos';
21+
import spanner = protos.google.spanner;
22+
import SpannerAsyncActionRequest = spanner.executor.v1.SpannerAsyncActionRequest;
23+
import SpannerAsyncActionResponse = spanner.executor.v1.SpannerAsyncActionResponse;
24+
25+
/**
26+
* Implements the SpannerExecutorProxy service, which handles asynchronous
27+
* Spanner actions via a bidirectional gRPC stream.
28+
*/
29+
export class CloudExecutorImpl {
30+
private clientExecutor: CloudClientExecutor;
31+
private tracer: Tracer;
32+
33+
constructor() {
34+
this.clientExecutor = new CloudClientExecutor();
35+
36+
this.tracer = trace.getTracer(CloudClientExecutor.name);
37+
}
38+
39+
/**
40+
* Handles incoming SpannerAsyncActionRequest messages from the client.
41+
*/
42+
public executeActionAsync(
43+
call: ServerDuplexStream<
44+
SpannerAsyncActionRequest,
45+
SpannerAsyncActionResponse
46+
>,
47+
): void {
48+
// Create a top-level OpenTelemetry span for streaming request.
49+
const span = this.tracer.startSpan(
50+
'nodejs_systest_execute_actions_stream',
51+
{
52+
root: true,
53+
},
54+
);
55+
56+
const streamContext = trace.setSpan(context.active(), span);
57+
58+
// The executionContext manages the lifecycle and flow state for this specific gRPC stream context.
59+
const executionContext =
60+
this.clientExecutor.createExecutionFlowContext(call);
61+
62+
// Handle receiving requests on duplex stream
63+
// Handle incoming requests sequentially on the duplex stream.
64+
call.on('data', (request: SpannerAsyncActionRequest) => {
65+
context.with(streamContext, () => {
66+
console.log(`Receiving request: \n${JSON.stringify(request, null, 2)}`);
67+
68+
// Ensure nested properties exist before attempting to inject configuration overrides.
69+
if (!request.action) request.action = {};
70+
if (!request.action.spannerOptions) request.action.spannerOptions = {};
71+
if (!request.action.spannerOptions.sessionPoolOptions)
72+
request.action.spannerOptions.sessionPoolOptions = {};
73+
74+
// as the multiplexed session is defualt enabled, mutate the incoming request to forcefully enforce multiplexed sessions for this proxy execution.
75+
request.action.spannerOptions.sessionPoolOptions.useMultiplexed = true;
76+
77+
console.log(
78+
`Updated request to set multiplexed session flag: \n${JSON.stringify(request, null, 2)}`,
79+
);
80+
81+
// TODO: Set requestHasReadOrQueryAction flag here when Read/Query are implemented.
82+
83+
try {
84+
const reqStatus = this.clientExecutor.startHandlingRequest(
85+
request,
86+
executionContext,
87+
);
88+
if (reqStatus.code !== status.OK) {
89+
console.error(
90+
`Failed to handle request, half closed: ${reqStatus.details}`,
91+
);
92+
}
93+
} catch (err) {
94+
console.error('Exception when handling request', err);
95+
}
96+
});
97+
});
98+
99+
// Handle stream errors
100+
call.on('error', (err: Error) => {
101+
context.with(streamContext, () => {
102+
console.error('Client ends the stream with error.', err);
103+
span.recordException(err);
104+
span.end();
105+
executionContext.cleanup();
106+
});
107+
});
108+
109+
// Handle the completion of the client stream
110+
call.on('end', async () => {
111+
await context.with(streamContext, async () => {
112+
span.end();
113+
// TODO: Add End-to-End trace verification here once Read/Query actions are implemented.
114+
console.log('Client called Done, half closed');
115+
executionContext.cleanup();
116+
117+
call.end();
118+
});
119+
});
120+
}
121+
}

0 commit comments

Comments
 (0)