1616
1717package org .cloudfoundry .reactor .util ;
1818
19+ import com .fasterxml .jackson .core .JsonPointer ;
1920import com .fasterxml .jackson .core .JsonProcessingException ;
21+ import com .fasterxml .jackson .databind .JsonMappingException ;
2022import com .fasterxml .jackson .databind .ObjectMapper ;
2123import com .fasterxml .jackson .databind .annotation .JsonSerialize ;
2224import io .netty .handler .codec .http .HttpHeaderNames ;
2527import io .netty .handler .codec .json .JsonObjectDecoder ;
2628import java .nio .charset .Charset ;
2729import java .util .function .BiFunction ;
30+ import org .cloudfoundry .reactor .util .JsonDeserializationProblemHandler .RetryException ;
2831import org .reactivestreams .Publisher ;
32+ import org .slf4j .Logger ;
33+ import org .slf4j .LoggerFactory ;
2934import reactor .core .Exceptions ;
3035import reactor .core .publisher .Mono ;
3136import reactor .netty .ByteBufFlux ;
3540public final class JsonCodec {
3641
3742 private static final int MAX_PAYLOAD_SIZE = 100 * 1024 * 1024 ;
43+ private static final Logger LOGGER = LoggerFactory .getLogger ("cloudfoundry-client" );
3844
3945 public static <T > Mono <T > decode (
4046 ObjectMapper objectMapper , ByteBufFlux responseBody , Class <T > responseType ) {
@@ -43,17 +49,57 @@ public static <T> Mono<T> decode(
4349 .asByteArray ()
4450 .map (
4551 payload -> {
46- try {
47- return objectMapper .readValue (payload , responseType );
48- } catch (Throwable t ) {
49- throw new JsonParsingException (
50- t .getMessage (),
51- t ,
52- new String (payload , Charset .defaultCharset ()));
53- }
52+ return decodeInternal (objectMapper , payload , responseType );
5453 });
5554 }
5655
56+ // decode the payload into an object.
57+ // If that fails, check the exception and retry.
58+ private static <T > T decodeInternal (
59+ ObjectMapper objectMapper , byte [] payload , Class <T > responseType ) {
60+ try {
61+ return objectMapper .readValue (payload , responseType );
62+ } catch (Throwable t ) {
63+ T result = null ;
64+ if (t instanceof JsonMappingException ) {
65+ Throwable cause = t .getCause ();
66+ if (cause != null ) {
67+ if (cause instanceof RetryException ) {
68+ result = retry (objectMapper , responseType , (RetryException ) cause , payload );
69+ if (result != null ) {
70+ return result ;
71+ }
72+ }
73+ }
74+ }
75+ if (t instanceof RetryException ) {
76+ result = retry (objectMapper , responseType , (RetryException ) t , payload );
77+ if (result != null ) {
78+ return result ;
79+ }
80+ }
81+ String payloadStr = new String (payload , Charset .defaultCharset ());
82+ LOGGER .warn ("unable to parse the following json message:" );
83+ LOGGER .warn (payloadStr );
84+ throw new JsonParsingException (t .getMessage (), t , payloadStr );
85+ }
86+ }
87+
88+ // drop the problematic element from the payload and try again.
89+ private static <T > T retry (
90+ ObjectMapper objectMapper ,
91+ Class <T > responseType ,
92+ RetryException cause ,
93+ byte [] payload ) {
94+ String property = cause .getProperty ();
95+ JsonPointer pointer = cause .getJsonPointer (); // JsonPointer.compile(pointerStr);
96+ payload = JsonDeserializationProblemHandler .dropProperty (payload , property , pointer );
97+ if (payload != null ) {
98+ return decodeInternal (objectMapper , payload , responseType );
99+ }
100+ return null ;
101+ }
102+
57103 public static void setDecodeHeaders (HttpHeaders httpHeaders ) {
58104 httpHeaders .set (HttpHeaderNames .ACCEPT , HttpHeaderValues .APPLICATION_JSON );
59105 }
0 commit comments