Skip to content

Commit ac6fe49

Browse files
committed
GROOVY-11933: Mesh async and http builder via Flow.Publisher
1 parent 203dc76 commit ac6fe49

6 files changed

Lines changed: 1356 additions & 4 deletions

File tree

src/main/java/groovy/concurrent/AwaitableAdapterRegistry.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package groovy.concurrent;
2020

2121
import org.apache.groovy.runtime.async.AsyncSupport;
22+
import org.apache.groovy.runtime.async.FlowPublisherAdapter;
2223
import org.apache.groovy.runtime.async.GroovyPromise;
2324

2425
import java.util.Iterator;
@@ -34,11 +35,13 @@
3435
* Central registry for {@link AwaitableAdapter} instances.
3536
* <p>
3637
* On class-load, adapters are discovered via {@link ServiceLoader} from
37-
* {@code META-INF/services/groovy.concurrent.AwaitableAdapter}. A built-in
38-
* adapter is always present as the lowest-priority fallback, handling:
38+
* {@code META-INF/services/groovy.concurrent.AwaitableAdapter}. Two built-in
39+
* adapters are always present after SPI-loaded ones:
3940
* <ul>
40-
* <li>{@link CompletableFuture} and {@link CompletionStage}</li>
41-
* <li>{@link Future} (adapted via a blocking wrapper)</li>
41+
* <li>{@link FlowPublisherAdapter} for {@link java.util.concurrent.Flow.Publisher}
42+
* (single-value {@code await} and multi-value {@code for await}).</li>
43+
* <li>A {@code Future} fallback handling {@link CompletableFuture},
44+
* {@link CompletionStage}, and {@link Future} (via a blocking wrapper).</li>
4245
* </ul>
4346
*
4447
* @since 6.0.0
@@ -64,6 +67,9 @@ public final class AwaitableAdapterRegistry {
6467
} catch (Throwable ignored) {
6568
// ServiceLoader failure — continue with built-in adapter only
6669
}
70+
// Built-in JDK Flow.Publisher adapter (after SPI, before Future fallback).
71+
// SPI adapters take precedence so framework-specific types resolve first.
72+
adapters.add(new FlowPublisherAdapter());
6773
// Built-in fallback adapter (lowest priority)
6874
adapters.add(new BuiltInAdapter());
6975
}
Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.groovy.runtime.async;
20+
21+
import groovy.concurrent.Awaitable;
22+
import groovy.concurrent.AwaitableAdapter;
23+
24+
import java.util.Iterator;
25+
import java.util.NoSuchElementException;
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.Flow;
28+
import java.util.concurrent.LinkedBlockingQueue;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.concurrent.atomic.AtomicReference;
31+
32+
/**
33+
* Adapter for {@link java.util.concurrent.Flow.Publisher}, the JDK's built-in
34+
* Reactive Streams type. Enables:
35+
* <ul>
36+
* <li>{@code await publisher} — completes with the first {@code onNext}
37+
* value, then cancels the subscription. Completes with {@code null} if
38+
* the publisher signals {@code onComplete} without emitting.</li>
39+
* <li>{@code for await (item in publisher)} — iterates over emitted values
40+
* with bounded backpressure (see {@link #DEFAULT_BATCH_SIZE}).</li>
41+
* </ul>
42+
* <p>
43+
* Conformance:
44+
* <ul>
45+
* <li>Reactive Streams §2.13: {@code onNext(null)} is treated as a protocol
46+
* violation and surfaced as a {@link NullPointerException}.</li>
47+
* <li>A second {@code onSubscribe} after the first is cancelled.</li>
48+
* <li>Signals after a terminal {@code onError}/{@code onComplete} are ignored.</li>
49+
* </ul>
50+
* <p>
51+
* This adapter is registered as the lowest-priority built-in (after SPI-loaded
52+
* adapters) so framework-specific adapters (Reactor, RxJava) take precedence
53+
* for their concrete types.
54+
*
55+
* @since 6.0.0
56+
*/
57+
public final class FlowPublisherAdapter implements AwaitableAdapter {
58+
59+
/**
60+
* Default request batch size for {@code for await} iteration. Chosen as a
61+
* compromise between throughput (larger = fewer {@code request()} calls)
62+
* and memory (larger = bigger in-flight buffer). Override per-call by
63+
* wrapping with a custom adapter if needed.
64+
*/
65+
public static final int DEFAULT_BATCH_SIZE = 32;
66+
67+
@Override
68+
public boolean supportsAwaitable(Class<?> type) {
69+
return Flow.Publisher.class.isAssignableFrom(type);
70+
}
71+
72+
@Override
73+
public boolean supportsIterable(Class<?> type) {
74+
return Flow.Publisher.class.isAssignableFrom(type);
75+
}
76+
77+
@Override
78+
@SuppressWarnings("unchecked")
79+
public <T> Awaitable<T> toAwaitable(Object source) {
80+
if (!(source instanceof Flow.Publisher<?>)) {
81+
throw new IllegalArgumentException("Cannot convert to Awaitable: " + source.getClass());
82+
}
83+
return (Awaitable<T>) publisherToAwaitable((Flow.Publisher<Object>) source);
84+
}
85+
86+
@Override
87+
@SuppressWarnings("unchecked")
88+
public <T> Iterable<T> toBlockingIterable(Object source) {
89+
if (!(source instanceof Flow.Publisher<?>)) {
90+
throw new IllegalArgumentException("Cannot convert to Iterable: " + source.getClass());
91+
}
92+
return (Iterable<T>) publisherToBlockingIterable((Flow.Publisher<Object>) source, DEFAULT_BATCH_SIZE);
93+
}
94+
95+
// ---- single-value: first onNext wins ---------------------------------
96+
97+
private static <T> Awaitable<T> publisherToAwaitable(Flow.Publisher<T> publisher) {
98+
CompletableFuture<T> cf = new CompletableFuture<>();
99+
AtomicReference<Flow.Subscription> subRef = new AtomicReference<>();
100+
publisher.subscribe(new Flow.Subscriber<T>() {
101+
@Override
102+
public void onSubscribe(Flow.Subscription s) {
103+
if (!subRef.compareAndSet(null, s)) {
104+
s.cancel(); // duplicate onSubscribe — cancel the second
105+
return;
106+
}
107+
s.request(1);
108+
}
109+
110+
@Override
111+
public void onNext(T item) {
112+
if (cf.isDone()) return;
113+
if (item == null) {
114+
cf.completeExceptionally(new NullPointerException(
115+
"Flow.Publisher onNext received null (Reactive Streams §2.13)"));
116+
} else {
117+
cf.complete(item);
118+
}
119+
Flow.Subscription s = subRef.getAndSet(null);
120+
if (s != null) s.cancel();
121+
}
122+
123+
@Override
124+
public void onError(Throwable t) {
125+
if (cf.isDone()) return;
126+
cf.completeExceptionally(t);
127+
subRef.set(null);
128+
}
129+
130+
@Override
131+
public void onComplete() {
132+
if (cf.isDone()) return;
133+
cf.complete(null); // empty publisher → null result
134+
subRef.set(null);
135+
}
136+
});
137+
return GroovyPromise.of(cf);
138+
}
139+
140+
// ---- multi-value: blocking iterable with bounded backpressure --------
141+
142+
private static <T> Iterable<T> publisherToBlockingIterable(Flow.Publisher<T> publisher, int batchSize) {
143+
return new PublisherBlockingIterable<>(publisher, batchSize);
144+
}
145+
146+
/**
147+
* Blocking {@link Iterable} backed by a {@link Flow.Publisher} with bounded
148+
* backpressure. Implements {@link AutoCloseable} so {@code for await}
149+
* cleanup (via {@code AsyncSupport.closeIterable}) cancels the subscription
150+
* on early break.
151+
*/
152+
private static final class PublisherBlockingIterable<T> implements Iterable<T>, AutoCloseable {
153+
private final Flow.Publisher<T> publisher;
154+
private final int batchSize;
155+
private final AtomicReference<QueueSubscriber<T>> active = new AtomicReference<>();
156+
157+
PublisherBlockingIterable(Flow.Publisher<T> publisher, int batchSize) {
158+
this.publisher = publisher;
159+
this.batchSize = Math.max(1, batchSize);
160+
}
161+
162+
@Override
163+
public Iterator<T> iterator() {
164+
QueueSubscriber<T> sub = new QueueSubscriber<>(batchSize);
165+
if (!active.compareAndSet(null, sub)) {
166+
throw new IllegalStateException(
167+
"PublisherBlockingIterable is single-use; iterator() was already called");
168+
}
169+
publisher.subscribe(sub);
170+
return sub;
171+
}
172+
173+
@Override
174+
public void close() {
175+
QueueSubscriber<T> sub = active.get();
176+
if (sub != null) sub.cancel();
177+
}
178+
}
179+
180+
/**
181+
* A {@link Flow.Subscriber} that buffers signals into a blocking queue and
182+
* exposes them as an {@link Iterator}. Demand is replenished in halves of
183+
* the batch size to overlap consumption with production.
184+
*/
185+
private static final class QueueSubscriber<T> implements Flow.Subscriber<T>, Iterator<T> {
186+
private static final Object COMPLETE = new Object();
187+
private static final Object ERROR = new Object();
188+
189+
private final LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>();
190+
private final int batchSize;
191+
private final int replenishThreshold;
192+
private final AtomicReference<Flow.Subscription> subRef = new AtomicReference<>();
193+
private final AtomicBoolean cancelled = new AtomicBoolean();
194+
195+
private Object next;
196+
private boolean hasNext;
197+
private boolean terminated;
198+
private Throwable error;
199+
private int consumedSinceRequest;
200+
201+
QueueSubscriber(int batchSize) {
202+
this.batchSize = batchSize;
203+
this.replenishThreshold = Math.max(1, batchSize / 2);
204+
}
205+
206+
@Override
207+
public void onSubscribe(Flow.Subscription s) {
208+
if (!subRef.compareAndSet(null, s)) {
209+
s.cancel();
210+
return;
211+
}
212+
// Handle cancel-before-subscribe race: cancel() may have already
213+
// run and enqueued the sentinel; dispose of this late subscription.
214+
if (cancelled.get() && subRef.compareAndSet(s, null)) {
215+
s.cancel();
216+
return;
217+
}
218+
s.request(batchSize);
219+
}
220+
221+
@Override
222+
public void onNext(T item) {
223+
if (cancelled.get()) return;
224+
if (item == null) {
225+
onError(new NullPointerException(
226+
"Flow.Publisher onNext received null (Reactive Streams §2.13)"));
227+
return;
228+
}
229+
queue.offer(item);
230+
}
231+
232+
@Override
233+
public void onError(Throwable t) {
234+
if (cancelled.get()) return;
235+
this.error = t;
236+
queue.offer(ERROR);
237+
subRef.set(null);
238+
}
239+
240+
@Override
241+
public void onComplete() {
242+
if (cancelled.get()) return;
243+
queue.offer(COMPLETE);
244+
subRef.set(null);
245+
}
246+
247+
void cancel() {
248+
if (!cancelled.compareAndSet(false, true)) return;
249+
try {
250+
Flow.Subscription s = subRef.getAndSet(null);
251+
if (s != null) s.cancel();
252+
} finally {
253+
// Wake any thread blocked in queue.take() inside hasNext().
254+
// The queue's happens-before also publishes the cancelled flag.
255+
queue.offer(COMPLETE);
256+
}
257+
}
258+
259+
@Override
260+
public boolean hasNext() {
261+
if (hasNext) return true;
262+
if (terminated) return false;
263+
try {
264+
Object item = queue.take();
265+
if (item == COMPLETE) {
266+
terminated = true;
267+
return false;
268+
}
269+
if (item == ERROR) {
270+
terminated = true;
271+
if (error instanceof RuntimeException re) throw re;
272+
if (error instanceof Error err) throw err;
273+
throw new RuntimeException(error);
274+
}
275+
next = item;
276+
hasNext = true;
277+
replenishIfNeeded();
278+
return true;
279+
} catch (InterruptedException e) {
280+
Thread.currentThread().interrupt();
281+
cancel();
282+
return false;
283+
}
284+
}
285+
286+
@Override
287+
@SuppressWarnings("unchecked")
288+
public T next() {
289+
if (!hasNext()) throw new NoSuchElementException();
290+
T value = (T) next;
291+
next = null;
292+
hasNext = false;
293+
return value;
294+
}
295+
296+
private void replenishIfNeeded() {
297+
consumedSinceRequest++;
298+
if (consumedSinceRequest >= replenishThreshold) {
299+
Flow.Subscription s = subRef.get();
300+
if (s != null) {
301+
int toRequest = consumedSinceRequest;
302+
consumedSinceRequest = 0;
303+
s.request(toRequest);
304+
}
305+
}
306+
}
307+
}
308+
}

0 commit comments

Comments
 (0)