mirror of
https://github.com/jlengrand/helidon.git
synced 2026-03-10 08:21:17 +00:00
Support for a max payload limit on client requests (#2491)
* Initial support for new server config property max-payload-size to limit the payload sizes in client requests, with or without the use of chunked encoding. Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com> * Fixed issue forwarding chunks and cleaned up reset logic. Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com> * Removed unnecessary log instruction. Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com> * Restored logging properties file. Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com> * Do not closed the channel when a 413 response is in flight. Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com> * Webserver config doc update. Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>
This commit is contained in:
committed by
GitHub
parent
c701e38737
commit
7425747cc5
@@ -84,6 +84,7 @@ Available socket configuration options:
|
||||
|`name` |`@default` for default socket |String |Name used for named sockets, to support additional server sockets (and their named routing)
|
||||
|`enabled` |`true` |boolean |A socket can be disabled through configuration, in which case it is never opened
|
||||
|`max-chunk-size` | `8192` |int |Maximal size of a chunk to read from incoming requests
|
||||
|`max-payload-size` | `-1` |long |Maximal size of a request payload in bytes. If exceeded a 413 error is returned. Negative value means no limit.
|
||||
|`validate-headers` |`true` |boolean |Whether to validate header names, if they contain illegal characters.
|
||||
|`initial-buffer-size` |`128` |int |Initial size of buffer used to parse HTTP line and headers
|
||||
|`tls` |{nbsp} |Object |Configuration of TLS, please see our TLS example in repository
|
||||
|
||||
@@ -50,6 +50,7 @@ import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
|
||||
import static io.helidon.webserver.HttpInitializer.CERTIFICATE_NAME;
|
||||
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
|
||||
import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE;
|
||||
import static io.netty.handler.codec.http.HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE;
|
||||
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
||||
|
||||
/**
|
||||
@@ -68,23 +69,34 @@ public class ForwardingHandler extends SimpleChannelInboundHandler<Object> {
|
||||
private final SSLEngine sslEngine;
|
||||
private final Queue<ReferenceHoldingQueue<DataChunk>> queues;
|
||||
private final HttpRequestDecoder httpRequestDecoder;
|
||||
private final long maxPayloadSize;
|
||||
|
||||
// this field is always accessed by the very same thread; as such, it doesn't need to be
|
||||
// concurrency aware
|
||||
private RequestContext requestContext;
|
||||
|
||||
private boolean isWebSocketUpgrade = false;
|
||||
private boolean isWebSocketUpgrade;
|
||||
private long actualPayloadSize;
|
||||
private boolean ignorePayload;
|
||||
|
||||
ForwardingHandler(Routing routing,
|
||||
NettyWebServer webServer,
|
||||
SSLEngine sslEngine,
|
||||
Queue<ReferenceHoldingQueue<DataChunk>> queues,
|
||||
HttpRequestDecoder httpRequestDecoder) {
|
||||
HttpRequestDecoder httpRequestDecoder,
|
||||
long maxPayloadSize) {
|
||||
this.routing = routing;
|
||||
this.webServer = webServer;
|
||||
this.sslEngine = sslEngine;
|
||||
this.queues = queues;
|
||||
this.httpRequestDecoder = httpRequestDecoder;
|
||||
this.maxPayloadSize = maxPayloadSize;
|
||||
}
|
||||
|
||||
private void reset() {
|
||||
isWebSocketUpgrade = false;
|
||||
actualPayloadSize = 0L;
|
||||
ignorePayload = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -103,13 +115,19 @@ public class ForwardingHandler extends SimpleChannelInboundHandler<Object> {
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("checkstyle:methodlength")
|
||||
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
|
||||
LOGGER.fine(() -> String.format("[Handler: %s] Received object: %s", System.identityHashCode(this), msg.getClass()));
|
||||
LOGGER.fine(() -> String.format("[Handler: %s, Channel: %s] Received object: %s",
|
||||
System.identityHashCode(this), System.identityHashCode(ctx.channel()), msg.getClass()));
|
||||
|
||||
if (msg instanceof HttpRequest) {
|
||||
|
||||
// Turns off auto read
|
||||
ctx.channel().config().setAutoRead(false);
|
||||
|
||||
// Reset internal state on new request
|
||||
reset();
|
||||
|
||||
// Check that HTTP decoding was successful or return 400
|
||||
HttpRequest request = (HttpRequest) msg;
|
||||
try {
|
||||
checkDecoderResult(request);
|
||||
@@ -117,9 +135,13 @@ public class ForwardingHandler extends SimpleChannelInboundHandler<Object> {
|
||||
send400BadRequest(ctx, e.getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
// Certificate management
|
||||
request.headers().remove(Http.Header.X_HELIDON_CN);
|
||||
Optional.ofNullable(ctx.channel().attr(CERTIFICATE_NAME).get())
|
||||
.ifPresent(name -> request.headers().set(Http.Header.X_HELIDON_CN, name));
|
||||
|
||||
// Queue, context and publisher creation
|
||||
ReferenceHoldingQueue<DataChunk> queue = new ReferenceHoldingQueue<>();
|
||||
queues.add(queue);
|
||||
requestContext = new RequestContext(new HttpRequestScopedPublisher(ctx, queue), request);
|
||||
@@ -137,6 +159,28 @@ public class ForwardingHandler extends SimpleChannelInboundHandler<Object> {
|
||||
return;
|
||||
}
|
||||
|
||||
// If context length is greater than maximum allowed, return 413 response
|
||||
if (maxPayloadSize >= 0) {
|
||||
String contentLength = request.headers().get(Http.Header.CONTENT_LENGTH);
|
||||
if (contentLength != null) {
|
||||
try {
|
||||
long value = Long.parseLong(contentLength);
|
||||
if (value > maxPayloadSize) {
|
||||
LOGGER.fine(() -> String.format("[Handler: %s, Channel: %s] Payload length over max %d > %d",
|
||||
System.identityHashCode(this), System.identityHashCode(ctx.channel()),
|
||||
value, maxPayloadSize));
|
||||
ignorePayload = true;
|
||||
send413PayloadTooLarge(ctx);
|
||||
return;
|
||||
}
|
||||
} catch (NumberFormatException e) {
|
||||
send400BadRequest(ctx, Http.Header.CONTENT_LENGTH + " header is invalid");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create response and handler for its completion
|
||||
BareResponseImpl bareResponse =
|
||||
new BareResponseImpl(ctx, request, publisherRef::isCompleted, Thread.currentThread(), requestId);
|
||||
bareResponse.whenCompleted()
|
||||
@@ -206,8 +250,22 @@ public class ForwardingHandler extends SimpleChannelInboundHandler<Object> {
|
||||
// payload is not consumed and the response is already sent; we must close the connection
|
||||
LOGGER.finer(() -> "Closing connection because request payload was not consumed; method: " + method);
|
||||
ctx.close();
|
||||
} else {
|
||||
requestContext.publisher().emit(content);
|
||||
} else if (!ignorePayload) {
|
||||
// Check payload size if a maximum has been set
|
||||
if (maxPayloadSize >= 0) {
|
||||
actualPayloadSize += content.readableBytes();
|
||||
if (actualPayloadSize > maxPayloadSize) {
|
||||
LOGGER.fine(() -> String.format("[Handler: %s, Channel: %s] Chunked Payload over max %d > %d",
|
||||
System.identityHashCode(this), System.identityHashCode(ctx.channel()),
|
||||
actualPayloadSize, maxPayloadSize));
|
||||
ignorePayload = true;
|
||||
send413PayloadTooLarge(ctx);
|
||||
} else {
|
||||
requestContext.publisher().emit(content);
|
||||
}
|
||||
} else {
|
||||
requestContext.publisher().emit(content);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,6 +292,11 @@ public class ForwardingHandler extends SimpleChannelInboundHandler<Object> {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that an HTTP message has been successfully decoded.
|
||||
*
|
||||
* @param request The HTTP request.
|
||||
*/
|
||||
private static void checkDecoderResult(HttpRequest request) {
|
||||
DecoderResult decoderResult = request.decoderResult();
|
||||
if (decoderResult.isFailure()) {
|
||||
@@ -293,6 +356,16 @@ public class ForwardingHandler extends SimpleChannelInboundHandler<Object> {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a 413 (Payload Too Large) response.
|
||||
*
|
||||
* @param ctx Channel context.
|
||||
*/
|
||||
private void send413PayloadTooLarge(ChannelHandlerContext ctx) {
|
||||
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, REQUEST_ENTITY_TOO_LARGE);
|
||||
ctx.write(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||
if (requestContext != null) {
|
||||
|
||||
@@ -132,7 +132,8 @@ class HttpInitializer extends ChannelInitializer<SocketChannel> {
|
||||
}
|
||||
|
||||
// Helidon's forwarding handler
|
||||
p.addLast(new ForwardingHandler(routing, webServer, sslEngine, queues, requestDecoder));
|
||||
p.addLast(new ForwardingHandler(routing, webServer, sslEngine, queues,
|
||||
requestDecoder, soConfig.maxPayloadSize()));
|
||||
|
||||
// Cleanup queues as part of event loop
|
||||
ch.eventLoop().execute(this::clearQueues);
|
||||
|
||||
@@ -177,6 +177,7 @@ class ServerBasicConfig implements ServerConfiguration {
|
||||
private final boolean validateHeaders;
|
||||
private final int initialBufferSize;
|
||||
private final boolean enableCompression;
|
||||
private final long maxPayloadSize;
|
||||
|
||||
/**
|
||||
* Creates new instance.
|
||||
@@ -195,6 +196,7 @@ class ServerBasicConfig implements ServerConfiguration {
|
||||
this.validateHeaders = builder.validateHeaders();
|
||||
this.initialBufferSize = builder.initialBufferSize();
|
||||
this.enableCompression = builder.enableCompression();
|
||||
this.maxPayloadSize = builder.maxPayloadSize();
|
||||
|
||||
WebServerTls webServerTls = builder.tlsConfig();
|
||||
if (webServerTls.enabled()) {
|
||||
@@ -287,5 +289,10 @@ class ServerBasicConfig implements ServerConfiguration {
|
||||
public boolean enableCompression() {
|
||||
return enableCompression;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long maxPayloadSize() {
|
||||
return maxPayloadSize;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -488,6 +488,17 @@ public interface ServerConfiguration extends SocketConfiguration {
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure maximum client payload size.
|
||||
* @param size maximum payload size
|
||||
* @return an updated builder
|
||||
*/
|
||||
@Override
|
||||
public Builder maxPayloadSize(long size) {
|
||||
this.defaultSocketBuilder.maxPayloadSize(size);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure experimental features.
|
||||
* @param experimental experimental configuration
|
||||
|
||||
@@ -175,6 +175,16 @@ public interface SocketConfiguration {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Maximum size allowed for an HTTP payload in a client request. A negative
|
||||
* value indicates that there is no maximum set.
|
||||
*
|
||||
* @return maximum payload size
|
||||
*/
|
||||
default long maxPayloadSize() {
|
||||
return -1L;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initial size of the buffer used to parse HTTP line and headers.
|
||||
*
|
||||
@@ -338,6 +348,15 @@ public interface SocketConfiguration {
|
||||
*/
|
||||
B enableCompression(boolean value);
|
||||
|
||||
/**
|
||||
* Set a maximum payload size for a client request. Can prevent DoS
|
||||
* attacks.
|
||||
*
|
||||
* @param size maximum payload size
|
||||
* @return this builder
|
||||
*/
|
||||
B maxPayloadSize(long size);
|
||||
|
||||
/**
|
||||
* Update this socket configuration from a {@link io.helidon.config.Config}.
|
||||
*
|
||||
@@ -351,6 +370,7 @@ public interface SocketConfiguration {
|
||||
config.get("backlog").asInt().ifPresent(this::backlog);
|
||||
config.get("max-header-size").asInt().ifPresent(this::maxHeaderSize);
|
||||
config.get("max-initial-line-length").asInt().ifPresent(this::maxInitialLineLength);
|
||||
config.get("max-payload-size").asInt().ifPresent(this::maxPayloadSize);
|
||||
|
||||
DeprecatedConfig.get(config, "timeout-millis", "timeout")
|
||||
.asInt()
|
||||
@@ -411,6 +431,7 @@ public interface SocketConfiguration {
|
||||
private boolean validateHeaders = true;
|
||||
private int initialBufferSize = 128;
|
||||
private boolean enableCompression = false;
|
||||
private long maxPayloadSize = -1;
|
||||
|
||||
private Builder() {
|
||||
}
|
||||
@@ -578,6 +599,12 @@ public interface SocketConfiguration {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Builder maxPayloadSize(long size) {
|
||||
this.maxPayloadSize = size;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure a socket name, to bind named routings to.
|
||||
*
|
||||
@@ -716,5 +743,9 @@ public interface SocketConfiguration {
|
||||
boolean enableCompression() {
|
||||
return enableCompression;
|
||||
}
|
||||
|
||||
long maxPayloadSize() {
|
||||
return maxPayloadSize;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -577,6 +577,12 @@ public interface WebServer {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Builder maxPayloadSize(long size) {
|
||||
configurationBuilder.maxPayloadSize(size);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure experimental features.
|
||||
* @param experimental experimental configuration
|
||||
|
||||
@@ -0,0 +1,225 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Oracle and/or its affiliates.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.helidon.webserver;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.concurrent.Flow;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import io.helidon.common.http.DataChunk;
|
||||
import io.helidon.common.http.Http;
|
||||
import io.helidon.common.http.MediaType;
|
||||
import io.helidon.webclient.WebClient;
|
||||
import io.helidon.webclient.WebClientRequestBuilder;
|
||||
import io.helidon.webclient.WebClientResponse;
|
||||
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
|
||||
/**
|
||||
* Tests support for compression in the webserver.
|
||||
*/
|
||||
public class MaxPayloadSizeTest {
|
||||
private static final Logger LOGGER = Logger.getLogger(MaxPayloadSizeTest.class.getName());
|
||||
|
||||
private static final long MAX_PAYLOAD_SIZE = 128L;
|
||||
private static String PAYLOAD = new String(new char[1024]).replace('\0', 'A');
|
||||
|
||||
private static WebServer webServer;
|
||||
private static WebClient webClient;
|
||||
|
||||
@BeforeAll
|
||||
public static void startServer() throws Exception {
|
||||
startServer(0);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void close() throws Exception {
|
||||
if (webServer != null) {
|
||||
webServer.shutdown()
|
||||
.toCompletableFuture()
|
||||
.get(10, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the Web Server
|
||||
*
|
||||
* @param port the port on which to start the server; if less than 1,
|
||||
* the port is dynamically selected
|
||||
* @throws Exception in case of an error
|
||||
*/
|
||||
private static void startServer(int port) throws Exception {
|
||||
webServer = WebServer.builder()
|
||||
.port(port)
|
||||
.routing(Routing.builder()
|
||||
.post("/maxpayload", (req, res) -> {
|
||||
req.content().subscribe(new Flow.Subscriber<>() {
|
||||
|
||||
@Override
|
||||
public void onSubscribe(Flow.Subscription subscription) {
|
||||
subscription.request(Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(DataChunk item) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
res.status(Http.Status.INTERNAL_SERVER_ERROR_500).send(t.getMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
res.status(Http.Status.OK_200).send();
|
||||
}
|
||||
});
|
||||
})
|
||||
.build())
|
||||
.maxPayloadSize(MAX_PAYLOAD_SIZE)
|
||||
.build()
|
||||
.start()
|
||||
.toCompletableFuture()
|
||||
.get(10, TimeUnit.SECONDS);
|
||||
|
||||
webClient = WebClient.builder()
|
||||
.baseUri("http://localhost:" + webServer.port())
|
||||
.validateHeaders(false)
|
||||
.keepAlive(true)
|
||||
.build();
|
||||
|
||||
LOGGER.info("Started server at: https://localhost:" + webServer.port());
|
||||
}
|
||||
|
||||
/**
|
||||
* If content length is greater than max, a 413 must be returned. No actual
|
||||
* payload in this case.
|
||||
*/
|
||||
@Test
|
||||
public void testContentLengthExceeded() {
|
||||
WebClientRequestBuilder builder = webClient.post();
|
||||
builder.headers().add("Content-Length", "512"); // over max
|
||||
WebClientResponse response = builder.path("/maxpayload")
|
||||
.contentType(MediaType.APPLICATION_OCTET_STREAM)
|
||||
.request()
|
||||
.await(5, TimeUnit.SECONDS);
|
||||
assertThat(response.status().code(), is(Http.Status.REQUEST_ENTITY_TOO_LARGE_413.code()));
|
||||
}
|
||||
|
||||
/**
|
||||
* If content length is greater than max, a 413 must be returned.
|
||||
*/
|
||||
@Test
|
||||
public void testContentLengthExceededWithPayload() {
|
||||
WebClientRequestBuilder builder = webClient.post();
|
||||
WebClientResponse response = builder.path("/maxpayload")
|
||||
.contentType(MediaType.APPLICATION_OCTET_STREAM)
|
||||
.submit(PAYLOAD)
|
||||
.await(5, TimeUnit.SECONDS);
|
||||
assertThat(response.status().code(), is(Http.Status.REQUEST_ENTITY_TOO_LARGE_413.code()));
|
||||
}
|
||||
|
||||
/**
|
||||
* If actual payload length is greater than max when using chunked encoding, a 413
|
||||
* must be returned.
|
||||
*/
|
||||
@Test
|
||||
public void testActualLengthExceededWithPayload() {
|
||||
WebClientRequestBuilder builder = webClient.post();
|
||||
WebClientResponse response = builder.path("/maxpayload")
|
||||
.contentType(MediaType.APPLICATION_OCTET_STREAM)
|
||||
.submit(new PayloadPublisher(PAYLOAD, 3))
|
||||
.await(5, TimeUnit.SECONDS);
|
||||
assertThat(response.status().code(), is(Http.Status.REQUEST_ENTITY_TOO_LARGE_413.code()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests mixed requests, some that exceed limits, others that do not.
|
||||
*/
|
||||
@Test
|
||||
public void testMixedGoodAndBadPayloads() {
|
||||
WebClientRequestBuilder builder = webClient.post();
|
||||
WebClientResponse response = builder.path("/maxpayload")
|
||||
.contentType(MediaType.APPLICATION_OCTET_STREAM)
|
||||
.submit(PAYLOAD.substring(0, 100))
|
||||
.await(5, TimeUnit.SECONDS);
|
||||
assertThat(response.status().code(), is(Http.Status.OK_200.code()));
|
||||
|
||||
builder = webClient.post();
|
||||
response = builder.path("/maxpayload")
|
||||
.contentType(MediaType.APPLICATION_OCTET_STREAM)
|
||||
.submit(new PayloadPublisher(PAYLOAD, 1))
|
||||
.await(5, TimeUnit.SECONDS);
|
||||
assertThat(response.status().code(), is(Http.Status.REQUEST_ENTITY_TOO_LARGE_413.code()));
|
||||
|
||||
builder = webClient.post();
|
||||
response = builder.path("/maxpayload")
|
||||
.contentType(MediaType.APPLICATION_OCTET_STREAM)
|
||||
.submit(PAYLOAD.substring(0, (int) MAX_PAYLOAD_SIZE))
|
||||
.await(5, TimeUnit.SECONDS);
|
||||
assertThat(response.status().code(), is(Http.Status.OK_200.code()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Publishes the same chunk multiple times. If number of times is greater than
|
||||
* one, Helidon will send content using chunked encoding.
|
||||
*/
|
||||
static class PayloadPublisher implements Flow.Publisher<DataChunk> {
|
||||
|
||||
private final String chunk;
|
||||
private int count;
|
||||
|
||||
PayloadPublisher(String chunk, int count) {
|
||||
this.chunk = chunk;
|
||||
this.count = Math.max(count, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(Flow.Subscriber<? super DataChunk> subscriber) {
|
||||
subscriber.onSubscribe(new Flow.Subscription() {
|
||||
private int nestedonNext = 0;
|
||||
private boolean onCompleteCalled = false;
|
||||
|
||||
@Override
|
||||
public void request(long n) {
|
||||
if (n != 1) {
|
||||
throw new UnsupportedOperationException("Request count must be 1");
|
||||
}
|
||||
if (count-- > 0) {
|
||||
nestedonNext++;
|
||||
subscriber.onNext(DataChunk.create(chunk.getBytes(Charset.defaultCharset())));
|
||||
nestedonNext--;
|
||||
}
|
||||
if (count <= 0 && nestedonNext == 0 && !onCompleteCalled) {
|
||||
onCompleteCalled = true;
|
||||
subscriber.onComplete();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user