博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊reactor extra的retry
阅读量:7071 次
发布时间:2019-06-28

本文共 4511 字,大约阅读时间需要 15 分钟。

本文主要研究一下reactor extra的retry

maven

io.projectreactor.addons
reactor-extra
3.1.4.RELEASE
复制代码

实例

TcpClient client = TcpClient.create("localhost", 8888);        client.newHandler((inbound,outbound) -> {                return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()                        .asString().next().log().then());        }).doOnError(e -> e.printStackTrace())                .subscribe();复制代码

上面这个TcpClient,在server没有启动的情况下连接会直接报错

io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8888	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:325)	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633)	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)	at java.lang.Thread.run(Thread.java:745)Caused by: java.net.ConnectException: Connection refused	... 10 more复制代码

连接失败重连

简单重试

client.newHandler((inbound,outbound) -> {                return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()                        .asString().next().log().then());        }).doOnError(e -> e.printStackTrace())                .retry(3)                .subscribe();复制代码

retry可以直接指定重试次数

高级重试

client.newHandler((inbound,outbound) -> {                return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()                        .asString().next().log().then());        }).doOnError(e -> e.printStackTrace())                .retryWhen(Retry.allBut(RuntimeException.class)                .retryMax(5000)                .fixedBackoff(Duration.ofSeconds(5))                .doOnRetry(e -> {                    e.exception().printStackTrace();                })        )                .subscribe();复制代码

利用reactor extra项目中的Retry帮助类,可以轻松指定高级重试策略,比如fixedBackoff,亦或是exponentialBackoff等

client.newHandler((inbound,outbound) -> {                return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()                        .asString().next().log().then());        }).doOnError(e -> e.printStackTrace())                .retryWhen(Retry.allBut(RuntimeException.class)                .retryMax(5000)           .exponentialBackoffWithJitter(Duration.ofMillis(100),Duration.ofMillis(500))                .doOnRetry(e -> {                    e.exception().printStackTrace();                })        )                .subscribe();复制代码

这里使用了exponentialBackoffWithJitter,第一个参数是firstBackoff时间,第二个参数是maxBackoff,也就是maxBackoffInterval,如果为null则相当于Duration.ofSeconds(Long.MAX_VALUE)

Retry

reactor-extra-3.1.4.RELEASE-sources.jar!/reactor/retry/Retry.java

/**	 * Returns a retry function that retries errors resulting from all exceptions except	 * the specified non-retriable exceptions. More constraints may be added using	 * {@link #retryMax(int)} or {@link #timeout(Duration)}.	 *	 * @param nonRetriableExceptions exceptions that may not be retried	 * @return retry function that retries all exceptions except the specified non-retriable exceptions.	 */	@SafeVarargs	static 
Retry
allBut(final Class
... nonRetriableExceptions) { Predicate
> predicate = context -> { Throwable exception = context.exception(); if (exception == null) return true; for (Class
clazz : nonRetriableExceptions) { if (clazz.isInstance(exception)) return false; } return true; }; return DefaultRetry.
create(predicate); }复制代码

可以看到使用DefaultRetry来创建

reactor-extra-3.1.4.RELEASE-sources.jar!/reactor/retry/DefaultRetry.java

public static 
DefaultRetry
create(Predicate
> retryPredicate) { return new DefaultRetry
(retryPredicate, 1, null, Backoff.zero(), Jitter.noJitter(), null, NOOP_ON_RETRY, (T) null); }复制代码

注意这里的maxIterations默认为1,也就是如果不指定retryMax,相当于高级重试策略就白费了,这个要额外注意一下。

小结

Reactor Extra提供的Retry工具类非常好用,值得实验一下。

doc

转载地址:http://tokml.baihongyu.com/

你可能感兴趣的文章