Skip to content

Commit 4326f31

Browse files
committed
review comments
1 parent 40e715f commit 4326f31

4 files changed

Lines changed: 92 additions & 74 deletions

File tree

handwritten/spanner/google-cloud-spanner-executor/src/cloud-client-executor.ts

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,12 @@ export class ExecutionFlowContext implements ExecutionFlowContextInterface {
4949
* Sends a response back to the client.
5050
*/
5151
public onNext(response: SpannerAsyncActionResponse): void {
52-
5352
// Prevent writing if client cancelled the call, or the underlying Node stream is un-writable/destroyed
54-
if (this.call.cancelled || this.call.destroyed || this.call.writable === false) {
53+
if (
54+
this.call.cancelled ||
55+
this.call.destroyed ||
56+
this.call.writable === false
57+
) {
5558
console.warn('Attempted to write to a closed or cancelled stream.');
5659
return;
5760
}
@@ -84,10 +87,25 @@ export class ExecutionFlowContext implements ExecutionFlowContextInterface {
8487
}
8588
}
8689

90+
type ActionHandler = (action: any, sender: OutcomeSender) => Promise<void>;
91+
8792
export class CloudClientExecutor {
8893
private spanner: Spanner;
8994
private tracer: Tracer;
9095

96+
private readonly adminActionRegistry: Record<string, ActionHandler> = {
97+
createCloudInstance: (action, sender) =>
98+
this.executeCreateCloudInstance(
99+
action as ICreateCloudInstanceAction,
100+
sender,
101+
),
102+
};
103+
104+
private readonly actionRegistry: Record<string, ActionHandler> = {
105+
admin: (action, sender) =>
106+
this.executeAdminAction(action as IAdminAction, sender),
107+
};
108+
91109
constructor() {
92110
const spannerOptions = CloudUtil.getSpannerOptions();
93111
this.spanner = new Spanner(spannerOptions);
@@ -144,8 +162,12 @@ export class CloudClientExecutor {
144162

145163
return context.with(trace.setSpan(context.active(), span), async () => {
146164
try {
147-
if (action.admin) {
148-
await this.executeAdminAction(action.admin, outcomeSender);
165+
const handler = this.actionRegistry[actionType];
166+
if (handler) {
167+
await handler(
168+
action[actionType as keyof typeof action],
169+
outcomeSender,
170+
);
149171
return;
150172
}
151173

@@ -171,16 +193,21 @@ export class CloudClientExecutor {
171193
sender: OutcomeSender,
172194
): Promise<void> {
173195
try {
174-
if (action.createCloudInstance) {
175-
await this.executeCreateCloudInstance(
176-
action.createCloudInstance,
196+
const adminType = Object.keys(action).find(
197+
k => action[k as keyof typeof action] !== undefined,
198+
);
199+
200+
if (adminType && this.adminActionRegistry[adminType]) {
201+
await this.adminActionRegistry[adminType](
202+
action[adminType as keyof typeof action],
177203
sender,
178204
);
179205
return;
180206
}
207+
181208
sender.finishWithError({
182209
code: status.UNIMPLEMENTED,
183-
message: 'Admin action not implemented',
210+
message: `Admin action ${adminType || 'unknown'} not implemented`,
184211
});
185212
} catch (e: any) {
186213
sender.finishWithError(e);
@@ -213,7 +240,6 @@ export class CloudClientExecutor {
213240
});
214241

215242
console.log('Waiting for instance creation operation to complete...');
216-
217243
await operation.promise();
218244

219245
console.log(`Instance ${instanceId} created successfully.`);

handwritten/spanner/google-cloud-spanner-executor/src/cloud-executor-impl.ts

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
import {ServerDuplexStream, status, ServiceError} from '@grpc/grpc-js';
17+
import {ServerDuplexStream, status} from '@grpc/grpc-js';
1818
import {trace, context, Tracer} from '@opentelemetry/api';
1919
import {CloudClientExecutor} from './cloud-client-executor';
2020
import * as protos from '../../protos/protos';
@@ -64,22 +64,7 @@ export class CloudExecutorImpl {
6464
call.on('data', (request: SpannerAsyncActionRequest) => {
6565
context.with(streamContext, () => {
6666
console.log(`Receiving request: \n${JSON.stringify(request, null, 2)}`);
67-
68-
// Ensure nested properties exist before attempting to inject configuration overrides.
69-
if (!request.action) request.action = {};
70-
if (!request.action.spannerOptions) request.action.spannerOptions = {};
71-
if (!request.action.spannerOptions.sessionPoolOptions)
72-
request.action.spannerOptions.sessionPoolOptions = {};
73-
74-
// as the multiplexed session is defualt enabled, mutate the incoming request to forcefully enforce multiplexed sessions for this proxy execution.
75-
request.action.spannerOptions.sessionPoolOptions.useMultiplexed = true;
76-
77-
console.log(
78-
`Updated request to set multiplexed session flag: \n${JSON.stringify(request, null, 2)}`,
79-
);
80-
8167
// TODO: Set requestHasReadOrQueryAction flag here when Read/Query are implemented.
82-
8368
try {
8469
const reqStatus = this.clientExecutor.startHandlingRequest(
8570
request,
@@ -107,8 +92,8 @@ export class CloudExecutorImpl {
10792
});
10893

10994
// Handle the completion of the client stream
110-
call.on('end', async () => {
111-
await context.with(streamContext, async () => {
95+
call.on('end', () => {
96+
context.with(streamContext, () => {
11297
span.end();
11398
// TODO: Add End-to-End trace verification here once Read/Query actions are implemented.
11499
console.log('Client called Done, half closed');

handwritten/spanner/google-cloud-spanner-executor/src/cloud-util.ts

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,49 @@
1717
import * as fs from 'fs';
1818
import * as grpc from '@grpc/grpc-js';
1919
import {WorkerProxy} from './worker-proxy';
20-
import { SpannerOptions } from '../../src';
20+
import {SpannerOptions} from '../../src';
21+
22+
import {NodeTracerProvider} from '@opentelemetry/sdk-trace-node';
23+
import {OTLPTraceExporter} from '@opentelemetry/exporter-trace-otlp-grpc';
24+
import {Resource} from '@opentelemetry/resources';
25+
import {ATTR_SERVICE_NAME} from '@opentelemetry/semantic-conventions';
26+
import {
27+
BatchSpanProcessor,
28+
TraceIdRatioBasedSampler,
29+
} from '@opentelemetry/sdk-trace-base';
30+
import {GoogleAuth} from 'google-auth-library';
2131

2232
/**
2333
* Provides utility methods for configuring the Cloud Spanner client for tests.
2434
*/
2535
export class CloudUtil {
36+
public static async setupOpenTelemetrySdk(): Promise<NodeTracerProvider> {
37+
const auth = new GoogleAuth({
38+
scopes: 'https://www.googleapis.com/auth/cloud-platform',
39+
keyFile: WorkerProxy.serviceKeyFile || undefined,
40+
});
41+
const authenticatedClient = await auth.getClient();
42+
43+
const traceExporter = new OTLPTraceExporter({
44+
url: 'https://telemetry.googleapis.com',
45+
credentials: grpc.credentials.combineChannelCredentials(
46+
grpc.credentials.createSsl(),
47+
grpc.credentials.createFromGoogleCredential(authenticatedClient as any),
48+
),
49+
});
50+
51+
const provider = new NodeTracerProvider({
52+
resource: new Resource({
53+
[ATTR_SERVICE_NAME]: 'spanner-node-worker-proxy',
54+
'gcp.project_id': WorkerProxy.PROJECT_ID,
55+
}) as any,
56+
sampler: new TraceIdRatioBasedSampler(1.0),
57+
spanProcessors: [new BatchSpanProcessor(traceExporter as any)],
58+
});
59+
60+
provider.register();
61+
return provider;
62+
}
2663
// If this is set too low, the peer server may return RESOURCE_EXHAUSTED errors if the response
2764
// error message causes the trailing headers to exceed this limit.
2865
private static readonly GRPC_MAX_HEADER_LIST_SIZE_BYTES = 10 * 1024 * 1024; // 10 MB
@@ -59,7 +96,8 @@ export class CloudUtil {
5996
// - default_authority is used for the HTTP/2 :authority header.
6097
(grpcOptions as grpc.ChannelOptions)['grpc.ssl_target_name_override'] =
6198
this.TEST_HOST_IN_CERT;
62-
(grpcOptions as grpc.ChannelOptions)['grpc.default_authority'] = this.TEST_HOST_IN_CERT;
99+
(grpcOptions as grpc.ChannelOptions)['grpc.default_authority'] =
100+
this.TEST_HOST_IN_CERT;
63101
}
64102

65103
(options as any).grpcOptions = grpcOptions;

handwritten/spanner/google-cloud-spanner-executor/src/worker-proxy.ts

Lines changed: 14 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,8 @@ import * as protoLoader from '@grpc/proto-loader';
1919
import yargs from 'yargs';
2020
import * as path from 'path';
2121
import * as fs from 'fs';
22-
import {NodeTracerProvider} from '@opentelemetry/sdk-trace-node';
23-
import {OTLPTraceExporter} from '@opentelemetry/exporter-trace-otlp-grpc';
24-
import {Resource} from '@opentelemetry/resources';
25-
import {ATTR_SERVICE_NAME} from '@opentelemetry/semantic-conventions';
26-
import {
27-
BatchSpanProcessor,
28-
TraceIdRatioBasedSampler,
29-
} from '@opentelemetry/sdk-trace-base';
3022
import {CloudExecutorImpl} from './cloud-executor-impl';
23+
import {CloudUtil} from './cloud-util';
3124
import {HealthImplementation} from 'grpc-health-check';
3225
import {ReflectionService} from '@grpc/reflection';
3326

@@ -42,8 +35,6 @@ const OPTION_CERTIFICATE = 'cert';
4235
const OPTION_SERVICE_KEY_FILE = 'service_key_file';
4336
const OPTION_USE_PLAIN_TEXT_CHANNEL = 'use_plain_text_channel';
4437
const OPTION_ENABLE_GRPC_FAULT_INJECTOR = 'enable_grpc_fault_injector';
45-
const OPTION_MULTIPLEXED_SESSION_OPERATIONS_RATIO =
46-
'multiplexed_session_operations_ratio';
4738

4839
/**
4940
* Acts as a proxy server that forwards incoming gRPC requests to the underlying
@@ -54,10 +45,10 @@ export class WorkerProxy {
5445
public static proxyPort = 0;
5546
public static cert = '';
5647
public static serviceKeyFile = '';
57-
public static multiplexedSessionOperationsRatio = 0.0;
48+
5849
public static usePlainTextChannel = false;
5950
public static enableGrpcFaultInjector = false;
60-
public static openTelemetrySdk: NodeTracerProvider;
51+
public static openTelemetrySdk: any;
6152

6253
public static readonly PROJECT_ID = 'spanner-cloud-systest';
6354
public static readonly CLOUD_TRACE_ENDPOINT =
@@ -67,34 +58,6 @@ export class WorkerProxy {
6758
private static readonly MAX_PORT = 65535;
6859
private static readonly TRACE_SAMPLING_RATE = 0.01;
6960

70-
/**
71-
* Sets up the OpenTelemetry SDK
72-
*/
73-
public static async setupOpenTelemetrySdk(): Promise<NodeTracerProvider> {
74-
const exporterConfig = {
75-
url: this.CLOUD_TRACE_ENDPOINT,
76-
headers: {
77-
'x-goog-api-key': this.PROJECT_ID,
78-
},
79-
};
80-
81-
// TODO: Replace with TraceExporter once the Cloud Trace Exporter is compatible with OpenTelemetry v2.
82-
// This will allow us to utilize `exporterConfig` and emit traces to GCP.
83-
84-
const traceExporter = new OTLPTraceExporter(exporterConfig);
85-
86-
const provider = new NodeTracerProvider({
87-
resource: new Resource({
88-
[ATTR_SERVICE_NAME]: 'spanner-node-worker-proxy',
89-
}) as any,
90-
sampler: new TraceIdRatioBasedSampler(this.TRACE_SAMPLING_RATE),
91-
spanProcessors: [new BatchSpanProcessor(traceExporter as any)],
92-
});
93-
94-
provider.register();
95-
return provider;
96-
}
97-
9861
/**
9962
* Parses and builds the command line options for the worker proxy.
10063
*/
@@ -126,10 +89,6 @@ export class WorkerProxy {
12689
type: 'boolean',
12790
description: 'Enable grpc fault injector in cloud client executor.',
12891
});
129-
parser.option(OPTION_MULTIPLEXED_SESSION_OPERATIONS_RATIO, {
130-
type: 'number',
131-
description: 'Ratio of operations to use multiplexed sessions.',
132-
});
13392

13493
try {
13594
return parser.parseSync();
@@ -187,7 +146,7 @@ export class WorkerProxy {
187146
!!commandLine[OPTION_ENABLE_GRPC_FAULT_INJECTOR];
188147

189148
// Setup the OpenTelemetry for tracing
190-
this.openTelemetrySdk = await this.setupOpenTelemetrySdk();
149+
this.openTelemetrySdk = await CloudUtil.setupOpenTelemetrySdk();
191150

192151
// Check if proto file exists
193152
if (!fs.existsSync(PROTO_PATH)) {
@@ -270,6 +229,16 @@ export class WorkerProxy {
270229

271230
process.on('SIGTERM', shutdown);
272231
process.on('SIGINT', shutdown);
232+
233+
process.on('uncaughtException', err => {
234+
console.error('Uncaught Exception:', err);
235+
shutdown();
236+
});
237+
238+
process.on('unhandledRejection', reason => {
239+
console.error('Unhandled Rejection:', reason);
240+
shutdown();
241+
});
273242
}
274243
}
275244
if (require.main === module) {

0 commit comments

Comments
 (0)