Configuration changes (#1357)

* Removal of project Reactor.
* Change support refactoring - in progress.
* Fixed value resolving, MP uses latest config.
* Longer timeouts for slow tests.

Signed-off-by: Tomas Langer <tomas.langer@oracle.com>
This commit is contained in:
Tomas Langer
2020-02-20 11:41:09 +01:00
committed by GitHub
parent 92c5340752
commit 206206a5c8
56 changed files with 382 additions and 1364 deletions

View File

@@ -1313,153 +1313,6 @@ Copyright 2017-2018 The OpenTracing Authors
Apache License, Version 2.0
--------------------------------------------
=======================
reactor-core 3.1.5-RELEASE
=======================
Non-Blocking Reactive Foundation for the JVM (io.projectreactor:reactor-core)
Copyright (c) Pivotal Software Inc, All Rights Reserved.
Copyright The Netty Project
Copyright the original author or authors.
Apache License, Version 2.0
--------------------------------------------
"reactive-streams" 1.0.1 (org.reactivestreams:reactive-streams)
To the extent possible under law, the person who associated CC0 with this
code has waived all copyright and related or neighboring rights to this code.
Statement of Purpose The laws of most jurisdictions throughout the world
automatically confer exclusive Copyright and Related Rights (defined below) upon
the creator and subsequent owner(s) (each and all, an "owner") of an original
work of authorship and/or a database (each, a "Work").
Certain owners wish to permanently relinquish those rights to a Work for the
purpose of contributing to a commons of creative, cultural and scientific works
("Commons") that the public can reliably and without fear of later claims of
infringement build upon, modify, incorporate in other works, reuse and
redistribute as freely as possible in any form whatsoever and for any purposes,
including without limitation commercial purposes. These owners may contribute to
the Commons to promote the ideal of a free culture and the further production of
creative, cultural and scientific works, or to gain reputation or greater
distribution for their Work in part through the use and efforts of others.
For these and/or other purposes and motivations, and without any expectation of
additional consideration or compensation, the person associating CC0 with a Work
(the "Affirmer"), to the extent that he or she is an owner of Copyright and
Related Rights in the Work, voluntarily elects to apply CC0 to the Work and
publicly distribute the Work under its terms, with knowledge of his or her
Copyright and Related Rights in the Work and the meaning and intended legal
effect of CC0 on those rights.
1. Copyright and Related Rights. A Work made available under CC0 may be
protected by copyright and related or neighboring rights ("Copyright and Related
Rights"). Copyright and Related Rights include, but are not limited to, the
following:
the right to reproduce, adapt, distribute, perform, display, communicate, and
translate a Work; moral rights retained by the original author(s) and/or
performer(s); publicity and privacy rights pertaining to a person's image or
likeness depicted in a Work; rights protecting against unfair competition in
regards to a Work, subject to the limitations in paragraph 4(a), below; rights
protecting the extraction, dissemination, use and reuse of data in a Work;
database rights (such as those arising under Directive 96/9/EC of the European
Parliament and of the Council of 11 March 1996 on the legal protection of
databases, and under any national implementation thereof, including any amended
or successor version of such directive); and other similar, equivalent or
corresponding rights throughout the world based on applicable law or treaty, and
any national implementations thereof. 2. Waiver. To the greatest extent
permitted by, but not in contravention of, applicable law, Affirmer hereby
overtly, fully, permanently, irrevocably and unconditionally waives, abandons,
and surrenders all of Affirmer's Copyright and Related Rights and associated
claims and causes of action, whether now known or unknown (including existing as
well as future claims and causes of action), in the Work (i) in all territories
worldwide, (ii) for the maximum duration provided by applicable law or treaty
(including future time extensions), (iii) in any current or future medium and
for any number of copies, and (iv) for any purpose whatsoever, including without
limitation commercial, advertising or promotional purposes (the "Waiver").
Affirmer makes the Waiver for the benefit of each member of the public at large
and to the detriment of Affirmer's heirs and successors, fully intending that
such Waiver shall not be subject to revocation, rescission, cancellation,
termination, or any other legal or equitable action to disrupt the quiet
enjoyment of the Work by the public as contemplated by Affirmer's express
Statement of Purpose.
3. Public License Fallback. Should any part of the Waiver for any reason be
judged legally invalid or ineffective under applicable law, then the Waiver
shall be preserved to the maximum extent permitted taking into account
Affirmer's express Statement of Purpose. In addition, to the extent the Waiver
is so judged Affirmer hereby grants to each affected person a royalty-free, non
transferable, non sublicensable, non exclusive, irrevocable and unconditional
license to exercise Affirmer's Copyright and Related Rights in the Work (i) in
all territories worldwide, (ii) for the maximum duration provided by applicable
law or treaty (including future time extensions), (iii) in any current or future
medium and for any number of copies, and (iv) for any purpose whatsoever,
including without limitation commercial, advertising or promotional purposes
(the "License"). The License shall be deemed effective as of the date CC0 was
applied by Affirmer to the Work. Should any part of the License for any reason
be judged legally invalid or ineffective under applicable law, such partial
invalidity or ineffectiveness shall not invalidate the remainder of the License,
and in such case Affirmer hereby affirms that he or she will not (i) exercise
any of his or her remaining Copyright and Related Rights in the Work or (ii)
assert any associated claims and causes of action with respect to the Work, in
either case contrary to Affirmer's express Statement of Purpose.
4. Limitations and Disclaimers.
No trademark or patent rights held by Affirmer are waived, abandoned,
surrendered, licensed or otherwise affected by this document. Affirmer offers
the Work as-is and makes no representations or warranties of any kind concerning
the Work, express, implied, statutory or otherwise, including without limitation
warranties of title, merchantability, fitness for a particular purpose, non
infringement, or the absence of latent or other defects, accuracy, or the
present or absence of errors, whether or not discoverable, all to the greatest
extent permissible under applicable law. Affirmer disclaims responsibility for
clearing rights of other persons that may apply to the Work or any use thereof,
including without limitation any person's Copyright and Related Rights in the
Work. Further, Affirmer disclaims responsibility for obtaining any necessary
consents, permissions or other rights required for any use of the Work. Affirmer
understands and acknowledges that Creative Commons is not a party to this
document and has no duty or obligation with respect to this CC0 or use of the
Work.
--------------------------------------------
"SLF4J API Module" 1.7.12 (org.slf4j:slf4j-api)
Copyright (c) QOS.ch
The MIT License SPDX short identifier: MIT
Further resources on the MIT License Copyright
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
--------------------------------------------
"FindBugs-jsr305" 3.0.2 (com.google.code.findbugs:jsr305)
Copyright (c) 2007-2009, JSR305 expert group
Apache License, Version 2.0
--------------------------------------------
"org.jetbrains.kotlin:kotlin-stdlib" 1.1.51 (org.jetbrains.kotlin:kotlin-stdlib)
Copyright JetBrains s.r.o.
Apache License, Version 2.0
--------------------------------------------
"IntelliJ IDEA Annotations" 13.0 (org.jetbrains:annotations)
Copyright Sascha Weinreuter
Copyright JetBrains s.r.o.
Apache License, Version 2.0
--------------------------------------------
=======================
SLF4J 1.7.15

View File

@@ -78,7 +78,6 @@ Notable changes:
- Upgrade Netty to 4.1.45 [1309](https://github.com/oracle/helidon/pull/1309)
- Upgrade Google libraries for Google login provider. [1229](https://github.com/oracle/helidon/pull/1229)
- Upgrade H2, HikariCP, Jedis, OCI SDK versions [1198](https://github.com/oracle/helidon/pull/1198)
- Upgrade reactor to 3.3.1-RELEASE [1235](https://github.com/oracle/helidon/pull/1235)
- Upgrade to FT 2.0.2 and Failsafe 2.2.3 [1204](https://github.com/oracle/helidon/pull/1204)
@@ -99,6 +98,7 @@ Here are the details:
- Removed `io.helidon.common.OptionalHelper`, please use methods of `java.util.Optional`
- Removed `io.helidon.common.StackWalker`, please use `java.lang.StackWalker`
- Removed `io.helidon.common.InputStreamHelper`, please use `java.io.InputStream` methods
- Removed dependency on Project Reactor
#### Tracing
- We have upgraded to OpenTracing version 0.33.0 that is not backward compatible, the following breaking changes exist

View File

@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2018, 2020 Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2018, 2020 Oracle and/or its affiliates.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -66,10 +66,6 @@
<groupId>io.helidon.common</groupId>
<artifactId>helidon-common-mapper</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,9 +33,9 @@ import java.util.logging.Logger;
* {@link #submit(Object)}. In other words, whenever the source of chunks sends data, the same thread is used to deliver the data
* to the subscriber.
* <p>
* Standard publisher implementations (such as {@link SubmissionPublisher} or Reactor Flux would use the same thread as
* {@link Subscription#request(long)} was called on to deliver the chunk when the data are already available; this implementation
* however strictly uses the originating thread.<p>
* Standard publisher implementations (such as {@link java.util.concurrent.SubmissionPublisher} or Reactor Flux would use
* the same thread as {@link Subscription#request(long)} was called on to deliver the chunk when the data are already available;
* this implementation however strictly uses the originating thread.<p>
* In order to be able to achieve such behavior, this publisher provides hooks on subscription methods: {@link #hookOnCancel()}
* and {@link #hookOnRequested(long, long)}.
* </p>

View File

@@ -1,196 +0,0 @@
/*
* Copyright (c) 2018, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.common.reactive;
import java.util.concurrent.Flow;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
/**
* Static utility methods for converting between Helidon reactive API {@link Flow} and
* <a href="http://www.reactive-streams.org/">reactive-streams</a>.
*
* @deprecated This class will be removed in the next major release.
*/
@Deprecated
public final class ReactiveStreamsAdapter {
// uninstantiable
private ReactiveStreamsAdapter() {
}
/**
* Return a {@link Flow.Publisher} from a {@link
* org.reactivestreams.Publisher}.
*
* @param publisher the source Publisher to convert
* @param <T> the type of the publisher
* @return a {@link Flow.Publisher}
*/
public static <T> Flow.Publisher<T> publisherToFlow(final Publisher<T> publisher) {
return new FlowPublisher<>(publisher);
}
/**
* Return a {@link org.reactivestreams.Publisher} from a {@link
* Flow.Publisher}.
*
* @param publisher the source Publisher to convert
* @param <T> the type of the publisher
* @return a {@link reactor.core.publisher.Flux}
*/
public static <T> Flux<T> publisherFromFlow(Flow.Publisher<T> publisher) {
return new ReactiveStreamsPublisher<>(publisher);
}
/**
* Return a {@link Flow.Subscriber} from a {@link
* org.reactivestreams.Subscriber}.
* @param <T> the type of the subscriber
* @param subscriber the source Subscriber to convert
* @return a {@link org.reactivestreams.Subscriber}
*/
public static <T> Flow.Subscriber<T> subscriberToFlow(final Subscriber<T> subscriber) {
return new FlowSubscriber<>(subscriber);
}
/**
* Return a {@link org.reactivestreams.Subscriber} from a {@link
* Flow.Subscriber}.
* @param <T> the type of the subscriber
* @param subscriber the source Subscriber to convert
* @return a {@link Flow.Subscriber}
*/
public static <T> Subscriber<T> subscriberFromFlow(final Flow.Subscriber<T> subscriber) {
return new ReactiveStreamsSubscriber<>(subscriber);
}
private static class ReactiveStreamsPublisher<T> extends Flux<T> {
private final Flow.Publisher<T> pub;
private ReactiveStreamsPublisher(Flow.Publisher<T> pub) {
this.pub = pub;
}
@Override
public void subscribe(final CoreSubscriber<? super T> actual) {
pub.subscribe(new FlowSubscriber<>(actual));
}
}
private static class FlowPublisher<T> implements Flow.Publisher<T> {
private final Publisher<T> pub;
private FlowPublisher(Publisher<T> pub) {
this.pub = pub;
}
@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
pub.subscribe(new ReactiveStreamsSubscriber<>(subscriber));
}
}
private static class ReactiveStreamsSubscriber<T> implements CoreSubscriber<T>, Flow.Subscription {
private final Flow.Subscriber<? super T> subscriber;
private Subscription subscription;
ReactiveStreamsSubscriber(Flow.Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
}
@Override
public void onSubscribe(final Subscription s) {
this.subscription = s;
subscriber.onSubscribe(this);
}
@Override
public void onNext(T o) {
subscriber.onNext(o);
}
@Override
public void onError(Throwable t) {
subscriber.onError(t);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
@Override
public void request(long n) {
subscription.request(n);
}
@Override
public void cancel() {
subscription.cancel();
}
}
private static class FlowSubscriber<T> implements Flow.Subscriber<T>, Subscription {
private final Subscriber<? super T> s;
private Flow.Subscription subscription;
FlowSubscriber(Subscriber<? super T> s) {
this.s = s;
}
@Override
public void onSubscribe(final Flow.Subscription subscription) {
this.subscription = subscription;
s.onSubscribe(this);
}
@Override
public void onNext(T o) {
s.onNext(o);
}
@Override
public void onError(Throwable throwable) {
s.onError(throwable);
}
@Override
public void onComplete() {
s.onComplete();
}
@Override
public void request(long n) {
subscription.request(n);
}
@Override
public void cancel() {
subscription.cancel();
}
}
}

View File

@@ -1,225 +0,0 @@
/*
* Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.common.reactive;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
* A {@link Flow.Publisher} that asynchronously issues submitted (non-null) items to current subscribers until it is closed.
*
* @param <T> the published item type
* @deprecated This class will be removed in the next major release.
*/
@Deprecated
public class SubmissionPublisher<T> implements Flow.Publisher<T>, AutoCloseable {
static {
// prevent reactor from using "console" logging
System.setProperty("reactor.logging.fallback", "JDK");
}
private final Flux<T> flux;
private final FluxSink<T> sink;
private final AtomicInteger numberOfSubscribers;
/**
* Creates a new SubmissionPublisher using the given Executor for
* async delivery to subscribers, with the given maximum buffer size
* for each subscriber.
*
* @param executor the executor to use for async delivery,
* supporting creation of at least one independent thread
* @param maxBufferCapacity the maximum capacity for each
* subscriber's buffer
* @throws IllegalArgumentException if maxBufferCapacity not
* positive
*/
public SubmissionPublisher(Executor executor, int maxBufferCapacity){
this(Schedulers.fromExecutor(executor), maxBufferCapacity);
}
/**
* Creates a new SubmissionPublisher using the current thread for delivery
* to subscribers, with the given maximum buffer size for each subscriber.
* @param maxBufferCapacity the maximum capacity for each
* subscriber's buffer
* @throws IllegalArgumentException if maxBufferCapacity not
*/
public SubmissionPublisher(int maxBufferCapacity){
this(Schedulers.immediate(), maxBufferCapacity);
}
/**
* Creates a new SubmissionPublisher using the current thread for delivery
* to subscribers, with maximum buffer capacity of
* {@link Flow#defaultBufferSize}.
*/
public SubmissionPublisher(){
this(Schedulers.immediate(), Flow.defaultBufferSize());
}
private SubmissionPublisher(Scheduler scheduler, int maxBufferCapacity) {
if (scheduler == null){
throw new NullPointerException();
}
if (maxBufferCapacity <= 0){
throw new IllegalArgumentException("capacity must be positive");
}
UnicastProcessor<T> processor = UnicastProcessor.<T>create();
sink = processor.sink();
flux = processor
.publish(maxBufferCapacity)
.autoConnect()
.subscribeOn(Schedulers.immediate())
.publishOn(scheduler);
numberOfSubscribers = new AtomicInteger(0);
}
/**
* Adds the given Subscriber.
*
* @param subscriber the subscriber
* @throws NullPointerException if subscriber is null
*/
@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
if (subscriber == null) throw new NullPointerException();
flux.subscribe(new OnCancelSubscriber<>(subscriber, this::onCancel));
numberOfSubscribers.incrementAndGet();
}
private void onCancel(Subscription subscription){
numberOfSubscribers.decrementAndGet();
}
/**
* Publishes the given item to each current subscriber.
*
* @param item the (non-null) item to publish
* @throws NullPointerException if item is null
*/
public void submit(T item) {
if (item == null) throw new NullPointerException();
sink.next(item);
}
/**
* Publishes the given item to each current subscriber.
*
* @param item the (non-null) item to publish
* @param onDrop not supported in the current implementation
* @throws NullPointerException if item is null
*/
public void offer(T item, BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
submit(item);
}
/**
* Unless already closed, issues {@link
* Flow.Subscriber#onError(Throwable) onError} signals to current
* subscribers with the given error, and disallows subsequent
* attempts to publish. Future subscribers also receive the given
* error. Upon return, this method does <em>NOT</em> guarantee
* that all subscribers have yet completed.
*
* @param error the {@code onError} argument sent to subscribers
* @throws NullPointerException if error is null
*/
public void closeExceptionally(Throwable error) {
if (error == null){
throw new NullPointerException();
}
sink.error(error);
}
@Override
public void close() {
sink.complete();
}
/**
* Returns the number of current subscribers.
*
* @return the number of current subscribers
*/
public int getNumberOfSubscribers() {
return numberOfSubscribers.get();
}
/**
* Returns true if this publisher has any subscribers.
*
* @return true if this publisher has any subscribers
*/
public boolean hasSubscribers() {
return getNumberOfSubscribers() > 0;
}
private static class OnCancelSubscriber<T> implements Subscriber<T> {
private final Subscriber<T> delegate;
private final Consumer<Subscription> onCancel;
OnCancelSubscriber(Flow.Subscriber<T> subscriber, Consumer<Subscription> onCancel) {
this.delegate = ReactiveStreamsAdapter.subscriberFromFlow(subscriber);
this.onCancel = onCancel;
}
@Override
public void onSubscribe(final Subscription s) {
delegate.onSubscribe(new Subscription() {
@Override
public void request(long n) {
s.request(n);
}
@Override
public void cancel() {
s.cancel();
onCancel.accept(s);
}
});
}
@Override
public void onNext(T t) {
delegate.onNext(t);
}
@Override
public void onError(Throwable t) {
delegate.onError(t);
}
@Override
public void onComplete() {
delegate.onComplete();
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,8 +18,6 @@
* Common reactive library for Helidon projects.
*
* @see java.util.concurrent.Flow
* @see io.helidon.common.reactive.ReactiveStreamsAdapter
* @see io.helidon.common.reactive.OutputStreamPublisher
* @see io.helidon.common.reactive.SubmissionPublisher
*/
package io.helidon.common.reactive;

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,8 +19,6 @@
*/
module io.helidon.common.reactive {
requires java.logging;
requires org.reactivestreams;
requires reactor.core;
requires io.helidon.common;
requires io.helidon.common.mapper;

View File

@@ -1,81 +0,0 @@
/*
* Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.common.reactive;
import java.util.concurrent.ForkJoinPool;
import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
public class SubmissionPublisherTest {
// TODO, do a test with requestMax.
// TODO use latches intead of thread.sleep
@Test
public void testMultipleSubscribers() throws InterruptedException{
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
TestSubscriber<String> s1 = new TestSubscriber<>();
publisher.subscribe(s1);
s1.request1();
TestSubscriber<String> s2 = new TestSubscriber<>();
publisher.subscribe(s2);
s2.request1();
publisher.submit("hello");
publisher.close();
Thread.sleep(1000);
assertThat(s1.getItems().size(), is(1));
assertThat(s2.getItems().size(), is(1));
assertThat(publisher.getNumberOfSubscribers(), is(2));
}
@Test
public void testNoReplayElements() throws InterruptedException{
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
TestSubscriber<String> s1 = new TestSubscriber<>();
publisher.subscribe(s1);
s1.request1();
publisher.submit("hello");
Thread.sleep(1000);
assertThat(s1.getItems().size(), is(1));
TestSubscriber<String> s2 = new TestSubscriber<>();
publisher.subscribe(s2);
s2.request1();
Thread.sleep(1000);
assertThat(s2.getItems().size(), is(0));
assertThat(publisher.getNumberOfSubscribers(), is(2));
}
@Test
public void testNoReplayElementsWithParallePublisher() throws InterruptedException{
SubmissionPublisher<String> publisher = new SubmissionPublisher<>(ForkJoinPool.commonPool(), 256);
TestSubscriber<String> s1 = new TestSubscriber<>();
publisher.subscribe(s1);
s1.request1();
publisher.submit("hello");
Thread.sleep(1000);
assertThat(s1.getItems().size(), is(1));
TestSubscriber<String> s2 = new TestSubscriber<>();
publisher.subscribe(s2);
s2.request1();
Thread.sleep(1000);
assertThat(s2.getItems().size(), is(0));
assertThat(publisher.getNumberOfSubscribers(), is(2));
}
}

View File

@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2017, 2020 Oracle and/or its affiliates.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -54,10 +54,6 @@
<groupId>io.helidon.common</groupId>
<artifactId>helidon-common-media-type</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.microprofile.config</groupId>
<artifactId>microprofile-config-api</artifactId>

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@ import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -87,10 +88,6 @@ abstract class AbstractConfigImpl implements Config, org.eclipse.microprofile.co
context = new NodeContextImpl();
}
ConfigMapperManager mapperManager() {
return mapperManager;
}
/**
* Returns a {@code String} value as {@link Optional} of configuration node if the node a leaf or "hybrid" node.
* Returns a {@link Optional#empty() empty} if the node is {@link Type#MISSING} type or if the node does not contain a direct
@@ -186,7 +183,6 @@ abstract class AbstractConfigImpl implements Config, org.eclipse.microprofile.co
/*
* MicroProfile Config methods
*/
@SuppressWarnings("unchecked")
@Override
public <T> T getValue(String propertyName, Class<T> propertyType) {
Config config = latestConfig.get();
@@ -300,7 +296,7 @@ abstract class AbstractConfigImpl implements Config, org.eclipse.microprofile.co
*/
private void waitForSubscription(long timeout, TimeUnit unit) {
CountDownLatch subscribeLatch = new CountDownLatch(1);
subscriber = new Flow.Subscriber<ConfigDiff>() {
subscriber = new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
@@ -337,13 +333,28 @@ abstract class AbstractConfigImpl implements Config, org.eclipse.microprofile.co
.get(AbstractConfigImpl.this.key);
}
ConfigFactory factory() {
return factory;
}
@Override
public Flow.Publisher<Config> changes() {
return changesPublisher;
public void onChange(Consumer<Config> onChangeConsumer) {
changesPublisher.subscribe(new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
// I want all
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Config item) {
onChangeConsumer.accept(item);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
}
void initMp() {
@@ -383,13 +394,7 @@ abstract class AbstractConfigImpl implements Config, org.eclipse.microprofile.co
}
}
// see #1183
// this must be changed, as otherwise we would not get changes in MP
// and why did it work when the MpConfig was a separate implementation?
// onChange(newConfig -> {
// // this does not work - seems that when there is more than one subscriber, the events are not delivered
// latestConfig.set(newConfig);
// });
onChange(latestConfig::set);
}
/**

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,11 +26,11 @@ import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SubmissionPublisher;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import io.helidon.common.reactive.SubmissionPublisher;
import io.helidon.config.internal.ConfigThreadFactory;
import io.helidon.config.internal.ConfigUtils;
import io.helidon.config.internal.ObjectNodeImpl;

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -836,68 +836,6 @@ public interface Config {
//
// config changes
//
/**
* Allows to subscribe on change on whole Config as well as on particular Config node.
* <p>
* A user can subscribe on root Config node and than will be notified on any change of Configuration.
* You can also subscribe on any sub-node, i.e. you will receive notification events just about sub-configuration.
* No matter how much the sub-configuration has changed you will receive just one notification event that is associated
* with a node you are subscribed on.
* If a user subscribes on older instance of Config and ones has already been published the last one is automatically
* submitted to new-subscriber.
* <p>
* The {@code Config} notification support is based on {@link ConfigSource#changes() ConfigSource changes support}.
* <p>
* Method {@link Flow.Subscriber#onError(Throwable)} is never called.
* Method {@link Flow.Subscriber#onComplete()} is called in case an associated
* {@link ConfigSource#changes() ConfigSource's changes Publisher} signals {@code onComplete} as well.
* <p>
* Note: It does not matter what instance version of Config (related to single {@link Builder} initialization)
* a user subscribes on. It is enough to subscribe just on single (e.g. on the first) Config instance.
* There is no added value to subscribe again on new Config instance.
*
* @return {@link Flow.Publisher} to be subscribed in. Never returns {@code null}.
* @see Config#onChange(Function)
*/
@Deprecated
default Flow.Publisher<Config> changes() {
return Flow.Subscriber::onComplete;
}
/**
* Directly subscribes {@code onNextFunction} function on change on whole Config or on particular Config node.
* <p>
* It automatically creates {@link ConfigHelper#subscriber(Function) Flow.Subscriber} that will
* delegate {@link Flow.Subscriber#onNext(Object)} to specified {@code onNextFunction} function.
* Created subscriber automatically {@link Flow.Subscription#request(long) requests} {@link Long#MAX_VALUE all events}
* in it's {@link Flow.Subscriber#onSubscribe(Flow.Subscription)} method.
* Function {@code onNextFunction} returns {@code false} in case user wants to {@link Flow.Subscription#cancel() cancel}
* current subscription.
* <p>
* A user can subscribe on root Config node and than will be notified on any change of Configuration.
* You can also subscribe on any sub-node, i.e. you will receive notification events just about sub-configuration.
* No matter how much the sub-configuration has changed you will receive just one notification event that is associated
* with a node you are subscribed on.
* If a user subscribes on older instance of Config and ones has already been published the last one is automatically
* submitted to new-subscriber.
* <p>
* The {@code Config} notification support is based on {@link ConfigSource#changes() ConfigSource changes support}.
* <p>
* Note: It does not matter what instance version of Config (related to single {@link Builder} initialization)
* a user subscribes on. It is enough to subscribe just on single (e.g. on the first) Config instance.
* There is no added value to subscribe again on new Config instance.
*
* @param onNextFunction {@link Flow.Subscriber#onNext(Object)} functionality
* @see Config#changes()
* @see ConfigHelper#subscriber(Function)
* @deprecated use {@link #onChange(Consumer)} instead
*/
@Deprecated
default void onChange(Function<Config, Boolean> onNextFunction) {
changes().subscribe(ConfigHelper.subscriber(onNextFunction));
}
/**
* Register a {@link Consumer} that is invoked each time a change occurs on whole Config or on a particular Config node.
* <p>
@@ -915,11 +853,7 @@ public interface Config {
* @param onChangeConsumer consumer invoked on change
*/
default void onChange(Consumer<Config> onChangeConsumer) {
// temporary workaround before change support is replaced by one not using Flow API
changes().subscribe(ConfigHelper.subscriber(config -> {
onChangeConsumer.accept(config);
return true;
}));
// no-op
}
/**
@@ -946,14 +880,14 @@ public interface Config {
* @see Config#key()
*/
interface Key extends Comparable<Key> {
/**
* Returns instance of Key that represents key of parent config node.
* <p>
* If the key represents root config node it returns {@code null}.
* If the key represents root config node it throws an exception.
*
* @return key that represents key of parent config node.
* @see #isRoot()
* @throws java.lang.IllegalStateException in case you attempt to call this method on a root node
*/
Key parent();
@@ -964,9 +898,7 @@ public interface Config {
* @return {@code true} in case the key represents root node, otherwise {@code false}.
* @see #parent()
*/
default boolean isRoot() {
return parent() == null;
}
boolean isRoot();
/**
* Returns the name of Config node.
@@ -1019,8 +951,7 @@ public interface Config {
}
StringBuilder sb = new StringBuilder();
char[] chars = name.toCharArray();
for (int i = 0; i < chars.length; i++) {
char ch = chars[i];
for (char ch : chars) {
if (ch == '~') {
sb.append("~0");
} else if (ch == '.') {
@@ -1549,8 +1480,9 @@ public interface Config {
* #addFilter(Function)} or {@link #addFilter(Supplier)} method.
* <p>
* Registered provider's {@link Function#apply(Object)} method is called every time the new Config is created. Eg. when
* this builder's {@link #build} method creates the {@link Config} or when the new {@link Config#changes() change event}
* is fired with new Config instance with its own filter instance is created.
* this builder's {@link #build} method creates the {@link Config} or when the new
* {@link Config#onChange(java.util.function.Consumer)} is fired with new Config instance with its own filter instance
* is created.
*
* @param configFilterProvider a config filter provider as a function of {@link Config} to {@link ConfigFilter}
* @return an updated builder instance
@@ -1568,7 +1500,8 @@ public interface Config {
* #addFilter(Function)} or {@link #addFilter(Supplier)} method.
* <p>
* Registered provider's {@link Function#apply(Object)} method is called every time the new Config is created. Eg. when
* this builder's {@link #build} method creates the {@link Config} or when the new {@link Config#changes() change event}
* this builder's {@link #build} method creates the {@link Config} or when the new
* {@link Config#onChange(java.util.function.Consumer)} change event
* is fired with new Config instance with its own filter instance is created.
*
* @param configFilterSupplier a config filter provider as a supplier of a function of {@link Config} to {@link
@@ -1606,34 +1539,32 @@ public interface Config {
Builder disableCaching();
/**
* Specifies "observe-on" {@link Executor} to be used by {@link Config#changes()} to deliver new Config instance.
* Specifies "observe-on" {@link Executor} to be used by {@link Config#onChange(java.util.function.Consumer)} to deliver
* new Config instance.
* Executor is also used to process reloading of config from appropriate {@link ConfigSource#changes() source}.
* <p>
* By default dedicated thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are available is used.
*
* @param changesExecutor the executor to use for async delivery of {@link Config#changes()} events
* @param changesExecutor the executor to use for async delivery of {@link Config#onChange(java.util.function.Consumer)}
* @return an updated builder instance
* @see #changesMaxBuffer(int)
* @see Config#changes()
* @see Config#onChange(Function)
* @see ConfigSource#changes()
* @see Config#onChange(java.util.function.Consumer)
*/
Builder changesExecutor(Executor changesExecutor);
/**
* Specifies maximum capacity for each subscriber's buffer to be used by by {@link Config#changes()}
* to deliver new Config instance.
* Specifies maximum capacity for each subscriber's buffer to be used by
* {@link Config#onChange(java.util.function.Consumer)} to deliver new Config instance.
* <p>
* By default {@link Flow#defaultBufferSize()} is used.
* <p>
* Note: Not consumed events will be dropped off.
*
* @param changesMaxBuffer the maximum capacity for each subscriber's buffer of {@link Config#changes()} events.
* @param changesMaxBuffer the maximum capacity for each subscriber's buffer of new config events.
* @return an updated builder instance
* @see #changesExecutor(Executor)
* @see Config#changes()
* @see Config#onChange(Function)
* @see Config#onChange(java.util.function.Consumer)
*/
Builder changesMaxBuffer(int changesMaxBuffer);

View File

@@ -57,7 +57,6 @@ import static java.util.Objects.requireNonNull;
*/
public final class ConfigSources {
private static final String SOURCES_KEY = "sources";
static final String DEFAULT_MAP_NAME = "map";
static final String DEFAULT_PROPERTIES_NAME = "properties";
@@ -757,9 +756,7 @@ public final class ConfigSources {
Map<String, String> result = new HashMap<>();
System.getProperties().stringPropertyNames()
.forEach(it -> {
result.put(it, System.getProperty(it));
});
.forEach(it -> result.put(it, System.getProperty(it)));
return result;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2019, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -202,99 +202,103 @@ public class MpConfigProviderResolver extends ConfigProviderResolver {
this.delegate.set(newDelegate);
}
private io.helidon.config.Config getCurrent() {
return delegate.get().context().last();
}
@Override
public Instant timestamp() {
return delegate.get().timestamp();
return getCurrent().timestamp();
}
@Override
public Key key() {
return delegate.get().key();
return getCurrent().key();
}
@Override
public io.helidon.config.Config get(Key key) {
return delegate.get().get(key);
return getCurrent().get(key);
}
@Override
public io.helidon.config.Config detach() {
return delegate.get().detach();
return getCurrent().detach();
}
@Override
public Type type() {
return delegate.get().type();
return getCurrent().type();
}
@Override
public boolean hasValue() {
return delegate.get().hasValue();
return getCurrent().hasValue();
}
@Override
public Stream<io.helidon.config.Config> traverse(Predicate<io.helidon.config.Config> predicate) {
return delegate.get().traverse();
return getCurrent().traverse();
}
@Override
public <T> T convert(Class<T> type, String value) throws ConfigMappingException {
return delegate.get().convert(type, value);
return getCurrent().convert(type, value);
}
@Override
public <T> ConfigValue<T> as(GenericType<T> genericType) {
return delegate.get().as(genericType);
return getCurrent().as(genericType);
}
@Override
public <T> ConfigValue<T> as(Class<T> type) {
return delegate.get().as(type);
return getCurrent().as(type);
}
@Override
public <T> ConfigValue<T> as(Function<io.helidon.config.Config, T> mapper) {
return delegate.get().as(mapper);
return getCurrent().as(mapper);
}
@Override
public <T> ConfigValue<List<T>> asList(Class<T> type) throws ConfigMappingException {
return delegate.get().asList(type);
return getCurrent().asList(type);
}
@Override
public <T> ConfigValue<List<T>> asList(Function<io.helidon.config.Config, T> mapper) throws ConfigMappingException {
return delegate.get().asList(mapper);
return getCurrent().asList(mapper);
}
@Override
public ConfigValue<List<io.helidon.config.Config>> asNodeList() throws ConfigMappingException {
return delegate.get().asNodeList();
return getCurrent().asNodeList();
}
@Override
public ConfigValue<Map<String, String>> asMap() throws MissingValueException {
return delegate.get().asMap();
return getCurrent().asMap();
}
@Override
public <T> T getValue(String propertyName, Class<T> propertyType) {
return ((Config) delegate.get()).getValue(propertyName, propertyType);
return ((Config) getCurrent()).getValue(propertyName, propertyType);
}
@Override
public <T> Optional<T> getOptionalValue(String propertyName, Class<T> propertyType) {
return ((Config) delegate.get()).getOptionalValue(propertyName, propertyType);
return ((Config) getCurrent()).getOptionalValue(propertyName, propertyType);
}
@Override
public Iterable<String> getPropertyNames() {
return ((Config) delegate.get()).getPropertyNames();
return ((Config) getCurrent()).getPropertyNames();
}
@Override
public Iterable<ConfigSource> getConfigSources() {
return ((Config) delegate.get()).getConfigSources();
return ((Config) getCurrent()).getConfigSources();
}
/**
@@ -303,7 +307,7 @@ public class MpConfigProviderResolver extends ConfigProviderResolver {
* @return the instance backing this config delegate
*/
public Config delegate() {
return (Config) delegate.get();
return (Config) getCurrent();
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,13 +27,13 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import io.helidon.common.reactive.SubmissionPublisher;
import io.helidon.config.internal.ConfigKeyImpl;
import io.helidon.config.internal.ConfigUtils;
import io.helidon.config.internal.ObjectNodeBuilderImpl;
@@ -305,8 +305,7 @@ class ProviderImpl implements Config.Context {
* The publisher repeats the last change event with any new subscriber.
*
* @return {@link Flow.Publisher} to be subscribed in. Never returns {@code null}.
* @see Config#changes()
* @see Config#onChange(Function)
* @see Config#onChange(java.util.function.Consumer)
*/
public Flow.Publisher<ConfigDiff> changes() {
return changesPublisher;

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -36,16 +36,9 @@ public class ConfigKeyImpl implements Config.Key {
private ConfigKeyImpl(ConfigKeyImpl parent, String name) {
Objects.requireNonNull(name, "name is mandatory");
if (parent != null && name.equals("")) {
if (parent.isRoot()) {
//collapse roots to single one
parent = null;
} else {
throw new IllegalArgumentException("Illegal empty name. Only root can be empty.");
}
}
if (name.contains(".")) {
throw new IllegalArgumentException("Illegal key token format. Dot character ('.') is not ");
throw new IllegalArgumentException("Illegal key token format. Dot character ('.') is not supported.");
}
this.parent = parent;
@@ -64,14 +57,22 @@ public class ConfigKeyImpl implements Config.Key {
}
this.name = Config.Key.unescapeName(name);
this.path = Collections.unmodifiableList(path);
fullKey = fullSB.toString();
this.fullKey = fullSB.toString();
}
@Override
public ConfigKeyImpl parent() {
if (null == parent) {
throw new IllegalStateException("Attempting to get parent of a root node. Guard by isRoot instead");
}
return parent;
}
@Override
public boolean isRoot() {
return (null == parent);
}
/**
* Creates new root instance of ConfigKeyImpl.
*
@@ -125,6 +126,9 @@ public class ConfigKeyImpl implements Config.Key {
private ConfigKeyImpl child(List<String> path) {
ConfigKeyImpl result = this;
for (String name : path) {
if ("".equals(name)) {
continue;
}
result = new ConfigKeyImpl(result, name);
}
return result;

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,11 +31,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.helidon.common.reactive.SubmissionPublisher;
import io.helidon.config.ConfigException;
import io.helidon.config.ConfigHelper;
import io.helidon.config.spi.PollingStrategy;

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,13 +22,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.helidon.common.reactive.SubmissionPublisher;
import io.helidon.config.ConfigHelper;
import io.helidon.config.spi.PollingStrategy;

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,9 +16,9 @@
package io.helidon.config.internal;
import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
@@ -113,10 +113,12 @@ public class ValueResolvingFilter implements ConfigFilter {
private static final String REGEX_BACKSLASH = "\\\\(?=\\$\\{([^}]+)\\})";
private static final Pattern PATTERN_BACKSLASH = Pattern.compile(REGEX_BACKSLASH);
// I only care about unresolved key happening within the same thread
private static final ThreadLocal<Set<Config.Key>> UNRESOLVED_KEYS = ThreadLocal.withInitial(HashSet::new);
private Config root;
private Optional<Boolean> failOnMissingReferenceSetting = Optional.empty();
private boolean failOnMissingReference = false;
private final Collection<Config.Key> unresolvedKeys = new ConcurrentLinkedQueue<>();
/**
* Creates an instance of filter with the specified behavior on missing
@@ -155,11 +157,11 @@ public class ValueResolvingFilter implements ConfigFilter {
@Override
public String apply(Config.Key key, String stringValue) {
if (unresolvedKeys.contains(key)) {
if (!UNRESOLVED_KEYS.get().add(key)) {
UNRESOLVED_KEYS.get().clear();
throw new IllegalStateException("Recursive update");
}
try {
unresolvedKeys.add(key);
return format(stringValue);
} catch (MissingValueException e) {
if (failOnMissingReference) {
@@ -169,7 +171,7 @@ public class ValueResolvingFilter implements ConfigFilter {
return stringValue;
}
} finally {
unresolvedKeys.remove(key);
UNRESOLVED_KEYS.get().remove(key);
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -113,7 +113,7 @@
* <h3>Monitoring Changes</h3>
*
* The program can react to configuration changes by passing a listener
* to {@link io.helidon.config.Config#onChange}.
* to {@link io.helidon.config.Config#onChange(java.util.function.Consumer)}.
*
* <h3 id="conversions">Converting Configuration to Java Types</h3>
* The {@link io.helidon.config.Config} class provides many methods for converting config

View File

@@ -41,7 +41,7 @@ import io.helidon.config.spi.ConfigParser.Content;
* @param <S> a type of data stamp
* @see Builder
*/
public abstract class AbstractConfigSource<S> extends AbstractMpSource<S> implements ConfigSource {
public abstract class AbstractConfigSource<S> extends AbstractMpSource<S> {
private final Function<Config.Key, String> mediaTypeMapping;
private final Function<Config.Key, ConfigParser> parserMapping;
@@ -63,6 +63,7 @@ public abstract class AbstractConfigSource<S> extends AbstractMpSource<S> implem
@Override
public final void init(ConfigContext context) {
configContext = context;
super.init(context);
}
/**

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2019, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -50,7 +50,7 @@ public abstract class AbstractMpSource<S> extends AbstractSource<ConfigNode.Obje
@Override
public void init(ConfigContext context) {
this.changes().subscribe(new Flow.Subscriber<Optional<ConfigNode.ObjectNode>>() {
this.changes().subscribe(new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,12 +21,12 @@ import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.helidon.common.reactive.SubmissionPublisher;
import io.helidon.config.Config;
import io.helidon.config.ConfigException;
import io.helidon.config.ConfigHelper;
@@ -122,7 +122,11 @@ public abstract class AbstractSource<T, S> implements Source<T> {
pollingEventSubscriber = null;
}
Flow.Publisher<Optional<T>> changesPublisher() {
/**
* Publisher of changes of this source.
* @return publisher of source data
*/
protected Flow.Publisher<Optional<T>> changesPublisher() {
return changesPublisher;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -70,7 +70,6 @@ public interface Source<T> extends Changeable<T>, AutoCloseable {
//
@Override
@Deprecated
default Flow.Publisher<Optional<T>> changes() { //TODO later remove, see Changeable interface
return Flow.Subscriber::onComplete;
}

View File

@@ -0,0 +1,55 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.config;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
public class ConfigChangeListener {
private CountDownLatch cdl = new CountDownLatch(1);
private volatile Config updatedConfig;
public void onChange(Config config) {
updatedConfig = config;
cdl.countDown();
}
/**
* Get the last obtained changed config.
*
* @param millisToWait milliseconds to wait for the event to happen
* @param expectedAwait expected result of the count down latch timeout (true for success, false for timeout)
* @return config instance or null if none was received
*/
public Config get(long millisToWait, boolean expectedAwait) {
try {
boolean result = cdl.await(millisToWait, TimeUnit.MILLISECONDS);
assertThat(result, is(expectedAwait));
} catch (InterruptedException e) {
// ignored
}
return updatedConfig;
}
public void reset() {
cdl = new CountDownLatch(1);
updatedConfig = null;
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,7 +20,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import io.helidon.config.spi.ConfigNode;
import io.helidon.config.spi.ConfigNode.ObjectNode;
@@ -28,20 +29,13 @@ import io.helidon.config.spi.TestingConfigSource;
import org.junit.jupiter.api.Test;
import static io.helidon.config.ConfigTest.waitFor;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Tests related to {@link Config#changes()} and/or {@link Config#onChange(Function)}.
* Tests related to {@link Config#onChange(java.util.function.Consumer)}.
*/
public class ConfigChangesTest {
@@ -63,9 +57,8 @@ public class ConfigChangesTest {
assertThat(config.get("key1").exists(), is(false));
// register subscriber
TestingConfigChangeSubscriber subscriber = new TestingConfigChangeSubscriber();
config.get("key1").changes().subscribe(subscriber);
subscriber.request1();
ConfigChangeListener listener = new ConfigChangeListener();
config.get("key1").onChange(listener::onChange);
// change config source
TimeUnit.MILLISECONDS.sleep(TEST_DELAY_MS); // Make sure timestamp changes.
@@ -77,7 +70,7 @@ public class ConfigChangesTest {
.build());
// wait for event
Config newConfig = subscriber.getLastOnNext(1000, true);
Config newConfig = listener.get(500, true);
// new: key exists
assertThat(newConfig.exists(), is(true));
@@ -107,9 +100,8 @@ public class ConfigChangesTest {
assertThat(config.get("key-1-1").exists(), is(true));
// register subscriber
TestingConfigChangeSubscriber subscriber = new TestingConfigChangeSubscriber();
config.get("key-1-1.key-2-1").changes().subscribe(subscriber);
subscriber.request1();
ConfigChangeListener listener = new ConfigChangeListener();
config.get("key-1-1.key-2-1").onChange(listener::onChange);
// change config source
TimeUnit.MILLISECONDS.sleep(TEST_DELAY_MS); // Make sure timestamp changes.
@@ -122,7 +114,7 @@ public class ConfigChangesTest {
.build());
// wait for event
assertThat(subscriber.getLastOnNext(1000, false), is(nullValue()));
assertThat(listener.get(500, false), is(nullValue()));
}
@Test
@@ -139,52 +131,51 @@ public class ConfigChangesTest {
.build();
// register subscriber1 on original leaf
TestingConfigChangeSubscriber subscriber1 = new TestingConfigChangeSubscriber();
ConfigChangeListener listener1 = new ConfigChangeListener();
config.get("key-1-1.key-2-1")
.changes().subscribe(subscriber1);
subscriber1.request1();
.onChange(listener1::onChange);
// register subscriber2 on leaf of DETACHED parent
TestingConfigChangeSubscriber subscriber2 = new TestingConfigChangeSubscriber();
ConfigChangeListener listener2 = new ConfigChangeListener();
config.get("key-1-1")
.detach()
.get("key-2-1")
.changes().subscribe(subscriber2);
subscriber2.request1();
.onChange(listener2::onChange);
// register subscriber3 on DETACHED leaf
TestingConfigChangeSubscriber subscriber3 = new TestingConfigChangeSubscriber();
ConfigChangeListener listener3 = new ConfigChangeListener();
config.get("key-1-1.key-2-1")
.detach()
.changes().subscribe(subscriber3);
subscriber3.request1();
.onChange(listener3::onChange);
// change config source
TimeUnit.MILLISECONDS.sleep(TEST_DELAY_MS); // Make sure timestamp changes.
configSource.changeLoadedObjectNode(ObjectNode.simple("key-1-1.key-2-1", "NEW item 1"));
// wait for event1
Config last1 = subscriber1.getLastOnNext(200, true);
Config last1 = listener1.get(200, true);
assertThat(last1.key().toString(), is("key-1-1.key-2-1"));
assertThat(last1.asString().get(), is("NEW item 1"));
// wait for event2
Config last2 = subscriber2.getLastOnNext(200, true);
Config last2 = listener2.get(200, true);
assertThat(last2.key().toString(), is("key-2-1"));
assertThat(last2.asString().get(), is("NEW item 1"));
// wait for event3
Config last3 = subscriber3.getLastOnNext(200, true);
Config last3 = listener3.get(200, true);
assertThat(last3.key().toString(), is(""));
assertThat(last3.asString().get(), is("NEW item 1"));
// timestamp 1==2==3
// no other events
subscriber1.request1();
subscriber2.request1();
subscriber3.request1();
assertThat(subscriber1.getLastOnNext(500, false), is(nullValue()));
assertThat(subscriber2.getLastOnNext(500, false), is(nullValue()));
assertThat(subscriber3.getLastOnNext(500, false), is(nullValue()));
listener1.reset();
listener2.reset();
listener3.reset();
// no need to wait for a long time, the event will not come
assertThat(listener1.get(50, false), is(nullValue()));
assertThat(listener2.get(50, false), is(nullValue()));
assertThat(listener3.get(50, false), is(nullValue()));
}
@Test
@@ -201,16 +192,15 @@ public class ConfigChangesTest {
.build();
// register subscriber1
TestingConfigChangeSubscriber subscriber1 = new TestingConfigChangeSubscriber();
ConfigChangeListener subscriber1 = new ConfigChangeListener();
config.get("key-1-1")
.changes().subscribe(subscriber1);
subscriber1.request1();
.onChange(subscriber1::onChange);
// register subscriber2 on DETACHED leaf
TestingConfigChangeSubscriber subscriber2 = new TestingConfigChangeSubscriber();
ConfigChangeListener subscriber2 = new ConfigChangeListener();
config.get("key-1-1")
.detach()
.changes().subscribe(subscriber2);
subscriber2.request1();
.onChange(subscriber2::onChange);
// change config source
TimeUnit.MILLISECONDS.sleep(TEST_DELAY_MS); // Make sure timestamp changes.
@@ -218,23 +208,23 @@ public class ConfigChangesTest {
ObjectNode.builder().addValue("key-1-1.key-2-1", "NEW item 1").build());
// wait for event
assertThat(subscriber1.getLastOnNext(200, true).key().toString(), is("key-1-1"));
assertThat(subscriber1.get(200, true).key().toString(), is("key-1-1"));
// wait for event1
Config last1 = subscriber1.getLastOnNext(200, true);
Config last1 = subscriber1.get(200, true);
assertThat(last1.key().toString(), is("key-1-1"));
assertThat(last1.get("key-2-1").asString().get(), is("NEW item 1"));
// wait for event2
Config last2 = subscriber2.getLastOnNext(200, true);
Config last2 = subscriber2.get(200, true);
assertThat(last2.key().toString(), is(""));
assertThat(last2.get("key-2-1").asString().get(), is("NEW item 1"));
// no other events
subscriber1.request1();
subscriber2.request1();
assertThat(subscriber1.getLastOnNext(500, false), is(nullValue()));
assertThat(subscriber2.getLastOnNext(500, false), is(nullValue()));
subscriber1.reset();
subscriber2.reset();
assertThat(subscriber1.get(50, false), is(nullValue()));
assertThat(subscriber2.get(50, false), is(nullValue()));
}
@Test
@@ -251,23 +241,22 @@ public class ConfigChangesTest {
.build();
// register subscriber1
TestingConfigChangeSubscriber subscriber1 = new TestingConfigChangeSubscriber();
config.changes().subscribe(subscriber1);
subscriber1.request1();
ConfigChangeListener subscriber1 = new ConfigChangeListener();
config.onChange(subscriber1::onChange);
// change config source
TimeUnit.MILLISECONDS.sleep(TEST_DELAY_MS); // Make sure timestamp changes.
configSource.changeLoadedObjectNode(
ObjectNode.builder().addValue("key-1-1.key-2-1", "NEW item 1").build());
// wait for event
Config last1 = subscriber1.getLastOnNext(200, true);
Config last1 = subscriber1.get(200, true);
assertThat(last1.key().toString(), is(""));
assertThat(last1.get("key-1-1.key-2-1").asString().get(), is("NEW item 1"));
// no other events
subscriber1.request1();
assertThat(subscriber1.getLastOnNext(500, false), is(nullValue()));
subscriber1.reset();
assertThat(subscriber1.get(50, false), is(nullValue()));
}
@Test
@@ -281,47 +270,12 @@ public class ConfigChangesTest {
.sources(configSource)
.build();
//Config not yet subscribed on config source
assertThat(configSource.isSubscribePollingStrategyInvoked(), is(false));
assertThat(configSource.isCancelPollingStrategyInvoked(), is(false));
List<TestingConfigChangeSubscriber> subscribers = new LinkedList<>();
List<ConfigChangeListener> subscribers = new LinkedList<>();
List.of("", "key1", "sub.key1", "", "key1").forEach(key -> {
TestingConfigChangeSubscriber subscriber = new TestingConfigChangeSubscriber();
config.get(key).changes().subscribe(subscriber);
ConfigChangeListener subscriber = new ConfigChangeListener();
config.get(key).onChange(subscriber::onChange);
subscribers.add(subscriber);
try {
subscriber.request1();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//config factory contains 5 subscribers
assertThat(config.factory().provider().changesSubmitter().getNumberOfSubscribers(), is(5));
//Config already subscribed on config source
waitFor(configSource::isSubscribePollingStrategyInvoked, 500, 10);
assertThat(configSource.isCancelPollingStrategyInvoked(), is(false));
//config source just 1
assertThat(configSource.changesSubmitter().getNumberOfSubscribers(), is(1));
TimeUnit.MILLISECONDS.sleep(TEST_DELAY_MS); // Make sure timestamp changes.
configSource.changeLoadedObjectNode(null);
//un-subscribe all
subscribers.forEach(subscriber -> subscriber.getSubscription().cancel());
//config factory does not have subscribers
waitFor(() -> !config.factory().provider().changesSubmitter().hasSubscribers(), 1_000, 10);
//Config already canceled from config source changes
assertThat(configSource.isSubscribePollingStrategyInvoked(), is(true));
assertThat(configSource.isCancelPollingStrategyInvoked(), is(true));
//config source does not have subscribers
waitFor(() -> !configSource.changesSubmitter().hasSubscribers(), 1_000, 10);
}
@Test
@@ -336,9 +290,8 @@ public class ConfigChangesTest {
assertThat(config.get("key1").exists(), is(false));
// register subscriber
TestingConfigChangeSubscriber subscriber = new TestingConfigChangeSubscriber();
config.get("key1").changes().subscribe(subscriber);
subscriber.request1();
ConfigChangeListener subscriber = new ConfigChangeListener();
config.get("key1").onChange(subscriber::onChange);
// change config source
TimeUnit.MILLISECONDS.sleep(TEST_DELAY_MS); // Make sure timestamp changes.
@@ -351,7 +304,7 @@ public class ConfigChangesTest {
.build());
// wait for event
Config newConfig = subscriber.getLastOnNext(1000, true);
Config newConfig = subscriber.get(1000, true);
// new: key exists
assertThat(newConfig.exists(), is(true));
@@ -371,9 +324,8 @@ public class ConfigChangesTest {
assertThat(config.get("key1").exists(), is(false));
// register subscriber
TestingConfigChangeSubscriber subscriber = new TestingConfigChangeSubscriber();
config.get("key1").changes().subscribe(subscriber);
subscriber.request1();
ConfigChangeListener subscriber = new ConfigChangeListener();
config.get("key1").onChange(subscriber::onChange);
// change config source
TimeUnit.MILLISECONDS.sleep(TEST_DELAY_MS); // Make sure timestamp changes.
@@ -383,7 +335,7 @@ public class ConfigChangesTest {
.build());
// wait for event
Config newConfig = subscriber.getLastOnNext(1000, true);
Config newConfig = subscriber.get(1000, true);
// new: key exists
assertThat(newConfig.exists(), is(true));
@@ -413,16 +365,15 @@ public class ConfigChangesTest {
assertThat(config.get("key1").get("sub1").asString().get(), is("string value"));
// register subscriber
TestingConfigChangeSubscriber subscriber = new TestingConfigChangeSubscriber();
config.get("key1").changes().subscribe(subscriber);
subscriber.request1();
ConfigChangeListener subscriber = new ConfigChangeListener();
config.get("key1").onChange(subscriber::onChange);
// change config source
TimeUnit.MILLISECONDS.sleep(TEST_DELAY_MS); // Make sure timestamp changes.
configSource.changeLoadedObjectNode(null);
// wait for event
Config newConfig = subscriber.getLastOnNext(1000, true);
Config newConfig = subscriber.get(1000, true);
// new: key does not exist
assertThat("New config should not exist", newConfig.exists(), is(false));
@@ -452,16 +403,15 @@ public class ConfigChangesTest {
assertThat(config.get("key1").asList(String.class).get(), contains("item 1", "item 2"));
// register subscriber
TestingConfigChangeSubscriber subscriber = new TestingConfigChangeSubscriber();
config.get("key1").changes().subscribe(subscriber);
subscriber.request1();
ConfigChangeListener subscriber = new ConfigChangeListener();
config.get("key1").onChange(subscriber::onChange);
// change config source
TimeUnit.MILLISECONDS.sleep(TEST_DELAY_MS); // Make sure timestamp changes.
configSource.changeLoadedObjectNode(null);
// wait for event
Config newConfig = subscriber.getLastOnNext(1000, true);
Config newConfig = subscriber.get(1000, true);
// new: key does not exist
assertThat(newConfig.exists(), is(false));
@@ -488,16 +438,15 @@ public class ConfigChangesTest {
assertThat(config.get("key1").asString().get(), is("string value"));
// register subscriber
TestingConfigChangeSubscriber subscriber = new TestingConfigChangeSubscriber();
config.get("key1").changes().subscribe(subscriber);
subscriber.request1();
ConfigChangeListener subscriber = new ConfigChangeListener();
config.get("key1").onChange(subscriber::onChange);
// change config source
TimeUnit.MILLISECONDS.sleep(TEST_DELAY_MS); // Make sure timestamp changes.
configSource.changeLoadedObjectNode(null);
// wait for event
Config newConfig = subscriber.getLastOnNext(1000, true);
Config newConfig = subscriber.get(1000, true);
// new: key does not exist
assertThat(newConfig.exists(), is(false));
@@ -522,12 +471,12 @@ public class ConfigChangesTest {
assertThat(config.get("key1").asString().get(), is("string value"));
//MOCK onNextFunction
Function<Config, Boolean> onNextFunction = mock(Function.class);
CountDownLatch onNextLatch = new CountDownLatch(1);
when(onNextFunction.apply(any())).then(invocationOnMock -> {
AtomicReference<Config> newConfigReference = new AtomicReference<>();
Consumer<Config> onNextFunction = aConfig -> {
onNextLatch.countDown();
return true;
});
newConfigReference.set(aConfig);
};
// register subscriber
config.get("key1").onChange(onNextFunction);
@@ -540,17 +489,15 @@ public class ConfigChangesTest {
ObjectNode.builder().addValue("key1", "string value 2").build());
// wait for event
onNextLatch.await(5, TimeUnit.SECONDS);
onNextLatch.await(2, TimeUnit.SECONDS);
// verify event
verify(onNextFunction, times(1)).apply(argThat(newConfig -> {
// new: key does exist
assertThat(newConfig.exists(), is(true));
assertThat(newConfig.type(), is(Config.Type.VALUE));
assertThat(newConfig.asString().get(), is("string value 2"));
Config newConfig = newConfigReference.get();
return true;
}));
// new: key does exist
assertThat(newConfig.exists(), is(true));
assertThat(newConfig.type(), is(Config.Type.VALUE));
assertThat(newConfig.asString().get(), is("string value 2"));
}
@Test
@@ -570,9 +517,8 @@ public class ConfigChangesTest {
assertThat(v1.get(key1).asString().get(), is("value"));
// subscribe s1 on v1
TestingConfigChangeSubscriber s1 = new TestingConfigChangeSubscriber();
v1.changes().subscribe(s1);
s1.request1();
ConfigChangeListener s1 = new ConfigChangeListener();
v1.onChange(s1::onChange);
///////////////////////////// FIRST change -> subscriber receives event
// change source => config v2
@@ -580,21 +526,20 @@ public class ConfigChangesTest {
configSource.changeLoadedObjectNode(
ObjectNode.builder().addValue(fullKey, "value 2").build());
// s1 receives v2
Config v2 = s1.getLastOnNext(200, true);
Config v2 = s1.get(200, true);
assertThat(v2.get(key1).asString().get(), is("value 2"));
s1.request1();
s1.reset();
///////////////////// subscribing on old Config -> subscriber receives (OLD) already fired event
// subscribe s2 on v1
TestingConfigChangeSubscriber s2 = new TestingConfigChangeSubscriber();
v1.changes().subscribe(s2);
s2.request1();
ConfigChangeListener s2 = new ConfigChangeListener();
v1.onChange(s2::onChange);
// s2 receives v2
Config s2v2 = s2.getLastOnNext(1200, true);
Config s2v2 = s2.get(1200, true);
assertThat(s2v2.get(key1).asString(), is(ConfigValues.simpleValue("value 2")));
//same v2s
assertThat(v2, is(s2v2));
s2.request1();
s2.reset();
///////////////////////////// another change -> BOTH subscribers receives NEW event
// change source => config v3
@@ -602,38 +547,36 @@ public class ConfigChangesTest {
configSource.changeLoadedObjectNode(
ObjectNode.builder().addValue(fullKey, "value 3").build());
// s1 receives v3
Config v3 = s1.getLastOnNext(200, true);
Config v3 = s1.get(200, true);
assertThat(v3.get(key1).asString(), is(ConfigValues.simpleValue("value 3")));
s1.request1();
s1.reset();
// s2 receives v3
Config s2v3 = s2.getLastOnNext(200, true);
Config s2v3 = s2.get(200, true);
assertThat(s2v3.get(key1).asString(), is(ConfigValues.simpleValue("value 3")));
s2.request1();
s2.reset();
//same v3s
assertThat(v3, is(s2v3));
///////////////////// new subscriber on V1 receives JUST the last event V3
// subscribe s3 on v1
TestingConfigChangeSubscriber s3 = new TestingConfigChangeSubscriber();
v1.changes().subscribe(s3);
s3.request1();
ConfigChangeListener s3 = new ConfigChangeListener();
v1.onChange(s3::onChange);
// s3 receives v3
Config s3v3 = s3.getLastOnNext(200, true);
Config s3v3 = s3.get(200, true);
assertThat(s3v3.get(key1).asString(), is(ConfigValues.simpleValue("value 3")));
s3.request1();
s3.reset();
//same v3s
assertThat(v3, is(s2v3));
assertThat(v3, is(s3v3));
///////////////////// new subscriber on V2 receives also JUST the last event V3
// subscribe s4 on v2
TestingConfigChangeSubscriber s4 = new TestingConfigChangeSubscriber();
v2.changes().subscribe(s4);
s4.request1();
ConfigChangeListener s4 = new ConfigChangeListener();
v2.onChange(s4::onChange);
// s4 receives v3
Config s4v3 = s4.getLastOnNext(200, true);
Config s4v3 = s4.get(200, true);
assertThat(s4v3.get(key1).asString(), is(ConfigValues.simpleValue("value 3")));
s4.request1();
s4.reset();
//same v3s
assertThat(v3, is(s2v3));
assertThat(v3, is(s3v3));
@@ -645,21 +588,21 @@ public class ConfigChangesTest {
configSource.changeLoadedObjectNode(
ObjectNode.builder().addValue(fullKey, "value 4").build());
// s1 receives v4
Config v4 = s1.getLastOnNext(200, true);
Config v4 = s1.get(200, true);
assertThat(v4.get(key1).asString(), is(ConfigValues.simpleValue("value 4")));
s1.request1();
s1.reset();
// s2 receives v4
Config s2v4 = s2.getLastOnNext(200, true);
Config s2v4 = s2.get(200, true);
assertThat(s2v4.get(key1).asString(), is(ConfigValues.simpleValue("value 4")));
s2.request1();
s2.reset();
// s3 receives v4
Config s3v4 = s3.getLastOnNext(200, true);
Config s3v4 = s3.get(200, true);
assertThat(s3v4.get(key1).asString(), is(ConfigValues.simpleValue("value 4")));
s3.request1();
s3.reset();
// s4 receives v4
Config s4v4 = s4.getLastOnNext(200, true);
Config s4v4 = s4.get(200, true);
assertThat(s4v4.get(key1).asString(), is(ConfigValues.simpleValue("value 4")));
s4.request1();
s4.reset();
//same v4s
assertThat(v4, is(s2v4));
assertThat(v4, is(s3v4));
@@ -667,11 +610,10 @@ public class ConfigChangesTest {
///////////////////// subscribing on the LAST Config does NOT fire the last event to subscriber
// subscribe s5 on v4
TestingConfigChangeSubscriber s5 = new TestingConfigChangeSubscriber();
v4.changes().subscribe(s5);
s5.request1();
ConfigChangeListener s5 = new ConfigChangeListener();
v4.onChange(s5::onChange);
// s5 must NOT receive v4
Config s5event = s5.getLastOnNext(200, false);
Config s5event = s5.get(200, false);
assertThat(s5event, is(nullValue()));
///////////////////////////// another change -> ALL subscribers receives NEW event, no matter what ver. they subscribed on
@@ -680,19 +622,19 @@ public class ConfigChangesTest {
configSource.changeLoadedObjectNode(
ObjectNode.builder().addValue(fullKey, "value 5").build());
// s1 receives v5
Config v5 = s1.getLastOnNext(200, true);
Config v5 = s1.get(200, true);
assertConfigValue(v5.get(key1).asString(), "value 5");
// s2 receives v5
Config s2v5 = s2.getLastOnNext(200, true);
Config s2v5 = s2.get(200, true);
assertConfigValue(s2v5.get(key1).asString(), "value 5");
// s3 receives v5
Config s3v5 = s3.getLastOnNext(200, true);
Config s3v5 = s3.get(200, true);
assertConfigValue(s3v5.get(key1).asString(), "value 5");
// s4 receives v5
Config s4v5 = s4.getLastOnNext(200, true);
Config s4v5 = s4.get(200, true);
assertConfigValue(s4v5.get(key1).asString(), "value 5");
// s5 receives v5
Config s5v5 = s5.getLastOnNext(200, true);
Config s5v5 = s5.get(200, true);
assertConfigValue(s5v5.get(key1).asString(), "value 5");
//same v5s
assertThat(v5, is(s2v5));
@@ -700,6 +642,7 @@ public class ConfigChangesTest {
assertThat(v5, is(s4v5));
assertThat(v5, is(s5v5));
}
// todo maybe move to a shared place, so we can play around with method singatures
public static <T> void assertConfigValue(ConfigValue<T> value, T expectedValue) {
assertThat(value, is(ConfigValues.simpleValue(expectedValue)));

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -126,9 +126,8 @@ public class ConfigContextTest {
public void testContextReloadNoPollingSourceSame(Map.Entry<String,String> e) throws InterruptedException {
TestContext c = new TestContext(e);
// subscribe on changes
TestingConfigChangeSubscriber subscriber1 = new TestingConfigChangeSubscriber();
c.config.changes().subscribe(subscriber1);
subscriber1.request1();
ConfigChangeListener listener = new ConfigChangeListener();
c.config.onChange(listener::onChange);
// config contains old data
assertThat(c.config.get(PROP1).asString().get(), is(c.oldValue));
@@ -145,7 +144,7 @@ public class ConfigContextTest {
assertConfigIsTheLast(c.config.context(), reloaded);
// no other events
assertThat(subscriber1.getLastOnNext(500, false), is(nullValue()));
assertThat(listener.get(500, false), is(nullValue()));
}
private static void assertConfigIsTheLast(Config.Context context, Config config) {
@@ -159,9 +158,8 @@ public class ConfigContextTest {
public void testContextReloadNoPollingSourceChanged(Map.Entry<String,String> e) throws InterruptedException {
TestContext c = new TestContext(e);
// subscribe on changes
TestingConfigChangeSubscriber subscriber1 = new TestingConfigChangeSubscriber();
c.config.changes().subscribe(subscriber1);
subscriber1.request1();
ConfigChangeListener listener = new ConfigChangeListener();
c.config.onChange(listener::onChange);
// config contains old data
assertThat(c.config.get(PROP1).asString().get(), is(c.oldValue));
@@ -181,7 +179,7 @@ public class ConfigContextTest {
assertConfigIsTheLast(c.config.context(), reloaded);
// change event
Config last1 = subscriber1.getLastOnNext(200, true);
Config last1 = listener.get(500, true);
assertThat(last1.key().toString(), is(c.key));
assertThat(last1.get(PROP1).asString(), is(ConfigValues.simpleValue(c.newValue)));
}
@@ -192,9 +190,8 @@ public class ConfigContextTest {
) throws InterruptedException {
TestContext c = new TestContext(e);
// subscribe on changes
TestingConfigChangeSubscriber subscriber1 = new TestingConfigChangeSubscriber();
c.config.changes().subscribe(subscriber1);
subscriber1.request1();
ConfigChangeListener listener = new ConfigChangeListener();
c.config.onChange(listener::onChange);
// config contains old data
assertThat(c.config.get(PROP1).asString().get(), is(c.oldValue));
@@ -204,7 +201,7 @@ public class ConfigContextTest {
c.changeSource(true, "old");
// no other events
assertThat(subscriber1.getLastOnNext(500, false), is(nullValue()));
assertThat(listener.get(500, false), is(nullValue()));
// context references the last reloaded config
assertConfigIsTheLast(c.config.context(), c.config);
@@ -214,9 +211,8 @@ public class ConfigContextTest {
@MethodSource("initParams")
public void testWithPollingSourceChanged(Map.Entry<String,String> e) throws InterruptedException {
TestContext c = new TestContext(e);// subscribe on changes
TestingConfigChangeSubscriber subscriber1 = new TestingConfigChangeSubscriber();
c.config.changes().subscribe(subscriber1);
subscriber1.request1();
ConfigChangeListener listener = new ConfigChangeListener();
c.config.onChange(listener::onChange);
// config contains old data
assertThat(c.config.get(PROP1).asString().get(), is(c.oldValue));
@@ -226,7 +222,7 @@ public class ConfigContextTest {
c.changeSource(true, "new");
// change event
Config last1 = subscriber1.getLastOnNext(200, true);
Config last1 = listener.get(500, true);
assertThat(last1.key().toString(), is(c.key));
assertThat(last1.get(PROP1).asString(), is(ConfigValues.simpleValue(c.newValue)));

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,10 +19,10 @@ package io.helidon.config;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import io.helidon.common.reactive.SubmissionPublisher;
import io.helidon.config.spi.ConfigContext;
import io.helidon.config.spi.ConfigNode.ObjectNode;
import io.helidon.config.spi.ConfigSource;

View File

@@ -188,30 +188,10 @@ public class FileConfigSourceTest {
.build();
CountDownLatch latch = new CountDownLatch(1);
config.changes().subscribe(new Flow.Subscriber<Config>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
}
@Override
public void onNext(Config item) {
}
@Override
public void onError(Throwable throwable) {
latch.countDown();
}
@Override
public void onComplete() {
latch.countDown();
}
});
config.onChange((event) -> true);
config.onChange(newConfig -> latch.countDown());
assertThat(latch.await(120, TimeUnit.SECONDS), is(false));
}
@Test

View File

@@ -1,59 +0,0 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.config;
import java.util.concurrent.Flow;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class ListenerTest {
/**
* Compilation test.
*/
@Test
public void compilation() {
assertThrows(RuntimeException.class, () -> {
Config config = null;
config.get("my").get("app").get("security").changes().subscribe(new ConfigChangeSubscriber());
});
}
class ConfigChangeSubscriber implements Flow.Subscriber<Config> {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Config event) {
//react on change of appropriate ConfigSource
}
@Override
public void onError(Throwable throwable) {
//the ConfigSource is not more accessible...
}
@Override
public void onComplete() {
}
}
}

View File

@@ -1,23 +0,0 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.config;
/**
* Testing implementation of {@link java.util.concurrent.Flow.Subscriber} on {@link Config#changes()} events.
*/
public class TestingConfigChangeSubscriber extends TestingSubscriber<Config> {
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,8 +19,8 @@ package io.helidon.config;
import java.time.Instant;
import java.util.concurrent.Flow;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;
import io.helidon.common.reactive.SubmissionPublisher;
import io.helidon.config.spi.PollingStrategy;
/**

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@ package io.helidon.config.internal;
import io.helidon.config.Config;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -26,60 +27,65 @@ import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
/**
* Tests {@link ConfigKeyImpl}.
*/
public class ConfigKeyImplTest {
private static final ConfigKeyImpl ROOT = ConfigKeyImpl.of();
@Test
public void testConfigKeyOf() {
assertThatKey((ConfigKeyImpl) Config.Key.create(""), true, nullValue(), "", "");
assertThatKey((ConfigKeyImpl) Config.Key.create("aaa"), false, not(nullValue()), "aaa", "aaa");
assertThatKey((ConfigKeyImpl) Config.Key.create("aaa.bbb.ccc"), false, not(nullValue()), "ccc", "aaa.bbb.ccc");
assertThatKey((ConfigKeyImpl) Config.Key.create(""), true, null, "", "");
assertThatKey((ConfigKeyImpl) Config.Key.create("aaa"), false, is(ROOT), "aaa", "aaa");
assertThatKey((ConfigKeyImpl) Config.Key.create("aaa.bbb.ccc"), false, not(ROOT), "ccc", "aaa.bbb.ccc");
}
@Test
public void testOfRoot() {
assertThatKey(ConfigKeyImpl.of(), true, nullValue(), "", "");
assertThatKey(ConfigKeyImpl.of(""), true, nullValue(), "", "");
assertThatKey(ConfigKeyImpl.of().child(""), true, nullValue(), "", "");
assertThatKey(ConfigKeyImpl.of().child(ConfigKeyImpl.of()), true, nullValue(), "", "");
assertThatKey(ConfigKeyImpl.of(), true, is(ROOT), "", "");
assertThatKey(ConfigKeyImpl.of(""), true, is(ROOT), "", "");
assertThatKey(ConfigKeyImpl.of().child(""), true, is(ROOT), "", "");
assertThatKey(ConfigKeyImpl.of().child(ConfigKeyImpl.of()), true, is(ROOT), "", "");
}
@Test
public void testOf() {
assertThatKey(ConfigKeyImpl.of("aaa"), false, not(nullValue()), "aaa", "aaa");
assertThatKey(ConfigKeyImpl.of("aaa.bbb"), false, not(nullValue()), "bbb", "aaa.bbb");
assertThatKey(ConfigKeyImpl.of("aaa.bbb.ccc"), false, not(nullValue()), "ccc", "aaa.bbb.ccc");
assertThatKey(ConfigKeyImpl.of("aaa"), false, is(ROOT), "aaa", "aaa");
assertThatKey(ConfigKeyImpl.of("aaa.bbb"), false, not(ROOT), "bbb", "aaa.bbb");
assertThatKey(ConfigKeyImpl.of("aaa.bbb.ccc"), false, not(ROOT), "ccc", "aaa.bbb.ccc");
}
@Test
public void testChildLevel1() {
assertThatKey(ConfigKeyImpl.of().child("aaa"), false, not(nullValue()), "aaa", "aaa");
assertThatKey(ConfigKeyImpl.of().child(ConfigKeyImpl.of("aaa")), false, not(nullValue()), "aaa", "aaa");
assertThatKey(ConfigKeyImpl.of().child("aaa"), false, is(ROOT), "aaa", "aaa");
assertThatKey(ConfigKeyImpl.of().child(ConfigKeyImpl.of("aaa")), false, is(ROOT), "aaa", "aaa");
}
@Test
public void testChildLevel2() {
assertThatKey(ConfigKeyImpl.of("aaa").child("bbb"), false, not(nullValue()), "bbb", "aaa.bbb");
assertThatKey(ConfigKeyImpl.of("aaa").child(ConfigKeyImpl.of("bbb")), false, not(nullValue()), "bbb", "aaa.bbb");
assertThatKey(ConfigKeyImpl.of("aaa").child("bbb"), false, not(ROOT), "bbb", "aaa.bbb");
assertThatKey(ConfigKeyImpl.of("aaa").child(ConfigKeyImpl.of("bbb")), false, not(ROOT), "bbb", "aaa.bbb");
assertThatKey(ConfigKeyImpl.of().child("aaa.bbb"), false, not(nullValue()), "bbb", "aaa.bbb");
assertThatKey(ConfigKeyImpl.of().child(ConfigKeyImpl.of("aaa.bbb")), false, not(nullValue()), "bbb", "aaa.bbb");
assertThatKey(ConfigKeyImpl.of().child("aaa.bbb"), false, not(ROOT), "bbb", "aaa.bbb");
assertThatKey(ConfigKeyImpl.of().child(ConfigKeyImpl.of("aaa.bbb")), false, not(ROOT), "bbb", "aaa.bbb");
}
@Test
public void testChildLevel3() {
assertThatKey(ConfigKeyImpl.of().child("aaa").child("bbb").child("ccc"), false, not(nullValue()), "ccc", "aaa.bbb.ccc");
assertThatKey(ConfigKeyImpl.of().child("aaa.bbb").child("ccc"), false, not(nullValue()), "ccc", "aaa.bbb.ccc");
assertThatKey(ConfigKeyImpl.of().child("aaa").child("bbb.ccc"), false, not(nullValue()), "ccc", "aaa.bbb.ccc");
assertThatKey(ConfigKeyImpl.of().child("aaa.bbb.ccc"), false, not(nullValue()), "ccc", "aaa.bbb.ccc");
assertThatKey(ConfigKeyImpl.of().child("aaa").child("bbb").child("ccc"), false, not(ROOT), "ccc", "aaa.bbb.ccc");
assertThatKey(ConfigKeyImpl.of().child("aaa.bbb").child("ccc"), false, not(ROOT), "ccc", "aaa.bbb.ccc");
assertThatKey(ConfigKeyImpl.of().child("aaa").child("bbb.ccc"), false, not(ROOT), "ccc", "aaa.bbb.ccc");
assertThatKey(ConfigKeyImpl.of().child("aaa.bbb.ccc"), false, not(ROOT), "ccc", "aaa.bbb.ccc");
}
private void assertThatKey(ConfigKeyImpl key, boolean root, Matcher<Object> parentMatcher, String name, String toString) {
assertThat(key.isRoot(), is(root));
assertThat(key.parent(), parentMatcher);
if (root) {
Assertions.assertThrows(IllegalStateException.class, key::parent);
} else {
assertThat(key.parent(), parentMatcher);
}
assertThat(key.name(), is(name));
assertThat(key.toString(), is(toString));
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,9 +26,9 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import io.helidon.common.reactive.SubmissionPublisher;
import io.helidon.config.spi.PollingStrategy;
import io.helidon.config.test.infra.TemporaryFolderExt;

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,7 +16,6 @@
package io.helidon.config.internal;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.Duration;
@@ -26,9 +25,9 @@ import java.util.Optional;
import java.util.stream.Collectors;
import io.helidon.config.Config;
import io.helidon.config.ConfigChangeListener;
import io.helidon.config.ConfigSources;
import io.helidon.config.PollingStrategies;
import io.helidon.config.TestingConfigChangeSubscriber;
import io.helidon.config.spi.OverrideSource;
import com.xebialabs.restito.server.StubServer;
@@ -72,7 +71,7 @@ public class UrlOverrideSourceServerMockTest {
private StubServer server;
@BeforeEach
public void before() throws IOException {
public void before() {
server = new StubServer().run();
}
@@ -207,9 +206,9 @@ public class UrlOverrideSourceServerMockTest {
assertThat(config.get("aaa.bbb.url").asString().get(), is("URL1"));
// register subscriber
TestingConfigChangeSubscriber subscriber = new TestingConfigChangeSubscriber();
config.get("aaa.bbb.url").changes().subscribe(subscriber);
subscriber.request1();
ConfigChangeListener listener = new ConfigChangeListener();
config.get("aaa.bbb.url").onChange(listener::onChange);
whenHttp(server).
match(method(GET), uri("/override")).
@@ -220,7 +219,7 @@ public class UrlOverrideSourceServerMockTest {
);
// wait for event
Config newConfig = subscriber.getLastOnNext(1000, true);
Config newConfig = listener.get(500, true);
// new: key exists
assertThat(newConfig.asString().get(), is("URL2"));

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,11 +18,10 @@ package io.helidon.config.spi;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import io.helidon.common.reactive.SubmissionPublisher;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,8 +18,8 @@ package io.helidon.config.spi;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.SubmissionPublisher;
import io.helidon.common.reactive.SubmissionPublisher;
import io.helidon.config.Config;
import io.helidon.config.ConfigException;
import io.helidon.config.spi.ConfigNode.ObjectNode;

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,10 +18,10 @@ package io.helidon.config.etcd;
import java.time.Instant;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.helidon.common.reactive.SubmissionPublisher;
import io.helidon.config.ConfigException;
import io.helidon.config.ConfigHelper;
import io.helidon.config.etcd.EtcdConfigSourceBuilder.EtcdEndpoint;

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,12 +22,12 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.helidon.common.reactive.SubmissionPublisher;
import io.helidon.config.etcd.internal.client.EtcdClient;
import io.helidon.config.etcd.internal.client.EtcdClientException;

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,11 +21,11 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.helidon.common.reactive.SubmissionPublisher;
import io.helidon.config.etcd.internal.client.EtcdClient;
import io.helidon.config.etcd.internal.client.EtcdClientException;
import io.helidon.config.etcd.internal.client.proto.KVGrpc;
@@ -79,6 +79,7 @@ public class EtcdV3Client implements EtcdClient {
throw new EtcdClientException("Cannot retrieve a value for the key: " + key, e);
}
}
@Override
public String get(String key) throws EtcdClientException {
RangeRequest.Builder builder = RangeRequest.newBuilder().setKey(ByteString.copyFromUtf8(key));

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@ import java.net.URI;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import io.helidon.config.Config;
@@ -81,26 +80,8 @@ public class EtcdConfigSourceIT {
CountDownLatch initLatch = new CountDownLatch(1);
CountDownLatch nextLatch = new CountDownLatch(3);
config.changes().subscribe(new Flow.Subscriber<Config>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
initLatch.countDown();
}
config.onChange(it -> initLatch.countDown());
@Override
public void onNext(Config item) {
nextLatch.countDown();
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
assertThat(initLatch.await(1, TimeUnit.SECONDS), is(true));
putConfiguration(version, "/application2.conf");

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,9 +23,9 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicLong;
import io.helidon.common.reactive.SubmissionPublisher;
import io.helidon.config.etcd.internal.client.EtcdClient;
import io.helidon.config.etcd.internal.client.EtcdClientException;

View File

@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2017, 2020 Oracle and/or its affiliates.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -52,11 +52,6 @@
<artifactId>restito</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.akarnokd</groupId>
<artifactId>rxjava2-jdk9-interop</artifactId>

View File

@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2019, 2020 Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2019, 2020 Oracle and/or its affiliates.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -93,7 +93,6 @@
<version.lib.persistence-api>2.2</version.lib.persistence-api>
<version.lib.prometheus>0.6.0</version.lib.prometheus>
<version.lib.reactivestreams>1.0.2</version.lib.reactivestreams>
<version.lib.reactor>3.3.1.RELEASE</version.lib.reactor>
<version.lib.slf4j>1.7.26</version.lib.slf4j>
<version.lib.smallrye-openapi>1.1.1</version.lib.smallrye-openapi>
<version.lib.snakeyaml>1.24</version.lib.snakeyaml>
@@ -143,11 +142,6 @@
<artifactId>jaeger-client</artifactId>
<version>${version.lib.jaegertracing}</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${version.lib.reactor}</version>
</dependency>
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-util</artifactId>

View File

@@ -1,116 +0,0 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.config.examples.changes;
import java.util.concurrent.Flow;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.helidon.config.Config;
import io.helidon.config.PollingStrategies;
import static io.helidon.config.ConfigSources.classpath;
import static io.helidon.config.ConfigSources.file;
import static io.helidon.config.PollingStrategies.regular;
import static java.time.Duration.ofSeconds;
/**
* Example shows how to listen on Config node changes using
* {@link Flow.Subscriber}. Method {@link Flow.Subscriber#onNext(Object) onNext}
* is invoked with new instance of Config, see {@link Config#changes()} for more
* detail.
* <p>
* The feature is based on using {@link io.helidon.config.spi.PollingStrategy}
* with selected config source(s) to check for changes.
* <p>
* <h2>A note about {@code Config.changes() }, {@code Flow.Publisher}, and
* {@code Flow.Subscriber}</h2>
* This example uses the {@link Config#changes() } API. That method is marked as
* deprecated because it its return type is the Helidon-specific interface
* {@link Flow.Publisher} which mimics the interface with the same name in Java
* 9 and later. Similarly the {@link Flow.Publisher#subscribe} method accepts a
* Helidon-specific {@link Flow.Subscriber}.
* <p>
* Once Helidon requires Java 9 or later (as opposed to Java 8), the
* {@code Config.changes()} API might change to return the Java
* {@code Flow.Subscriber} interface instead of the Helidon-specific one. By
* marking the method as deprecated we encourage developers to be very careful
* in how they use the method and, specifically, its return value and the
* argument to {@code subscribe}. Developers should avoid propagating these
* Helidon-specific types throughout their code to minimize the disruption
* if and when the {@code Config.changes()} method evolves to return the Java,
* not Helidon, {@code Flow.Publisher}.
*/
public class ChangesSubscriberExample {
private static final Logger LOGGER = Logger.getLogger(ChangesSubscriberExample.class.getName());
/**
* Executes the example.
*/
public void run() {
Config config = Config
.create(file("conf/dev.yaml")
.optional()
.pollingStrategy(PollingStrategies::watch),
file("conf/config.yaml")
.optional()
.pollingStrategy(regular(ofSeconds(2))),
classpath("default.yaml")
.pollingStrategy(regular(ofSeconds(10))));
// first greeting
greeting(config.get("app"));
// subscribe using custom Flow.Subscriber - see class-level JavaDoc
config.get("app").changes()
.subscribe(new AppConfigSubscriber());
}
private void greeting(Config appConfig) {
LOGGER.info("[ChangesSubscriber] " + appConfig.get("greeting").asString() + " " + appConfig.get("name").asString() + ".");
}
/**
* Flow Subscriber on "app" config node.
*/
private class AppConfigSubscriber implements Flow.Subscriber<Config> {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Config config) {
greeting(config);
}
@Override
public void onError(Throwable throwable) {
LOGGER.log(Level.WARNING,
throwable,
() -> "Config Changes support failed. " + throwable.getLocalizedMessage());
}
@Override
public void onComplete() {
LOGGER.info("Config Changes support finished. There will no other Config reload.");
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,8 +38,6 @@ public class Main {
* @throws IOException in case of IO error
*/
public static void main(String... args) throws IOException, InterruptedException {
// subscribe on config changes using custom Flow.Subscriber
new ChangesSubscriberExample().run();
// subscribe using simple onChange function
new OnChangeExample().run();
// use same Supplier instances to get up-to-date value

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,7 +16,6 @@
package io.helidon.config.examples.changes;
import java.util.function.Function;
import java.util.logging.Logger;
import io.helidon.config.Config;
@@ -26,7 +25,7 @@ import io.helidon.config.PollingStrategies;
import static java.time.Duration.ofSeconds;
/**
* Example shows how to listen on Config node changes using simplified API, {@link Config#onChange(Function)}.
* Example shows how to listen on Config node changes using simplified API, {@link Config#onChange(java.util.function.Consumer)}.
* The Function is invoked with new instance of Config.
* <p>
* The feature is based on using {@link io.helidon.config.spi.PollingStrategy} with

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -99,13 +99,11 @@ public class Main {
/**
* Initialize logging from config.
*/
private static boolean initLogging(Config loggingConfig) {
private static void initLogging(Config loggingConfig) {
String level = loggingConfig.get("level").asString().orElse("WARNING");
//e.g. initialize logging using configured level...
System.out.println("Set logging level to " + level + ".");
return true;
}
}

View File

@@ -50,10 +50,6 @@
<groupId>io.helidon.common</groupId>
<artifactId>helidon-common-reactive</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>

View File

@@ -1,5 +1,5 @@
#
# Copyright (c) 2019, 2020 Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2019, 2020 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -20,4 +20,4 @@
#
Args=--no-fallback \
--initialize-at-run-time=io.netty.util.internal.logging.Log4JLogger \
--initialize-at-build-time=io.helidon,io.netty,reactor.core,reactor.util,org.yaml.snakeyaml,org.reactivestreams,org.glassfish.json,org.eclipse.microprofile,io.opentracing,javax.json,org.glassfish.jersey,javax.ws.rs,org.glassfish.hk2,org.jvnet.hk2
--initialize-at-build-time=io.helidon,io.netty,org.yaml.snakeyaml,org.reactivestreams,org.glassfish.json,org.eclipse.microprofile,io.opentracing,javax.json,org.glassfish.jersey,javax.ws.rs,org.glassfish.hk2,org.jvnet.hk2

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,8 +20,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import io.helidon.common.reactive.SubmissionPublisher;
import java.util.concurrent.SubmissionPublisher;
import org.junit.jupiter.api.Test;

View File

@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2017, 2020 Oracle and/or its affiliates.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -54,10 +54,6 @@
<groupId>io.helidon.jersey</groupId>
<artifactId>helidon-jersey-server</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.webserver</groupId>
<artifactId>helidon-webserver-test-support</artifactId>

View File

@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2017, 2020 Oracle and/or its affiliates.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -42,10 +42,6 @@
<groupId>io.helidon.common</groupId>
<artifactId>helidon-common-reactive</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -33,7 +34,6 @@ import java.util.logging.Logger;
import io.helidon.common.http.DataChunk;
import io.helidon.common.http.Http;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.SubmissionPublisher;
import org.hamcrest.collection.IsCollectionWithSize;
import org.hamcrest.core.Is;

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,13 +30,13 @@ import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import io.helidon.common.http.DataChunk;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
import io.helidon.common.reactive.SubmissionPublisher;
import io.helidon.media.common.ContentReaders;
import io.helidon.media.common.MediaSupport;
import io.helidon.media.common.MessageBodyFilter;