NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.option(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP, false)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
}
}).remoteAddress("127.0.0.1", 6666);
try {
ChannelFuture future = bootstrap.connect();
Channel channel = future.channel();
channel.writeAndFlush("44"); // 建立连接后发送消息
System.out.println("send ");// 打印
channel.closeFuture();// 使用完后,关闭channel
} finally {
eventExecutors.shutdownGracefully();
}
运行上面代码,可能出现:客户端程序运行一段时间自动退出。
try {
ChannelFuture future = bootstrap.connect();
// sync,同步阻塞。因为必须要得且channel建立连接后才能后续操作,所以阻塞即可
Channel channel = future.sync().channel();
channel.writeAndFlush("44");
System.out.println("send ");
// 这里可以阻塞main线程,或者 异步等待结果,main线程可以去处理其他事情。
channel.closeFuture().sync();
} finally {
eventExecutors.shutdownGracefully();
}
或者
ChannelFuture future = bootstrap.connect();
Channel channel = future.sync().channel();
channel.writeAndFlush("44");
System.out.println("send ");
// 监听关闭事件。一旦channel关闭,就释放池子
channel.closeFuture().addListener(future1 -> {
eventExecutors.shutdownGracefully();
});
// main线程可以做其他事情。。
不知道你是不是碰到过这样的场景?
和服务端扯皮时,你拿着调用链路图给他看,你说你们接口是10s,服务端把他们的日志拿出来,一摊手:就3s啊,没准是网络波动了。你看他又把锅推到了网络上了。下图。
然后服务端认了,马上整改。可是写代码的是外包仔,对netty不熟悉。
他埋点这么做的:
ctw.writeAndFlush(xx); //外包仔认为此处已经发送出去了。
记录埋点;
调用 writeAndFlush 并不代表消息已经发送到网络上,它仅仅是一个异步的消息发送操作,调用 writeAndFlush 之后,Netty 会执行一系列操作,最终将消息发送到网络上。所以要监听他的结果。
后来经过毒打,终于正确了:(注意执行andListener监听的线程是NIO线程,也就是NIO线程记录的埋点。)
future = ctw.writeAndFlush(xx);
future.addListener(f -> {
记录埋点;
});
netty为了方便多线程交互,写了一个类叫promise,他和java的future一样。
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
// 让nio线程去通知listener
DefaultPromise<String> promise = new DefaultPromise<>(group.next());
promise.addListener(future -> {
// 当前线程应该是NIO线程。
System.out.println(Thread.currentThread().getName() + "promise listener:" + future.get());
});
new Thread(() -> {
try {
String result = promise.get();
System.out.println(Thread.currentThread().getName() + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
promise.cause().printStackTrace();
}
}).start();
new Thread(() -> {
try {
String result = promise.get();
System.out.println(Thread.currentThread().getName() + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
promise.cause().printStackTrace();
}
}).start();
Thread.sleep(1000);
System.out.println("其他线程都被阻塞了。main没有被阻塞。只有调用了setSuccess后,其他阻塞的线程才能成功");
System.out.println(promise.isSuccess());
promise.setSuccess("success");
System.out.println(promise.isSuccess());
}
promise.get和listener,都会被阻塞直到promise被赋值。
先分析addlistener.
如何做到的。看promise.setSuccess(“success”);源码。
发现用AtomicReferenceFieldUpdater把promise里面的result升级为原子操作。然后用CAS看result是否赋值过。一旦赋值成功,就去调用notifyListeners唤醒。
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
if (checkNotifyWaiters()) {
notifyListeners();
}
return true;
}
return false;
}
一直到唤醒的底层。循环去调用operationComplete接口。
奥原来如此,addListener时,每个listener都要实现operationComplete接口。
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}
那promise.get呢?
首先竞争promise锁,竞争成功就去循环wait。其他没有竞争成功的就去阻塞队列等待,进入blocked状态。
synchronized (this) {
while (!isDone()) {
incWaiters();
try {
wait(0);
} finally {
decWaiters();
}
}
}
等等,进入blocked的线程那谁去唤醒他啊?还记得checkNotifyWaiters方法吗?
它里面不仅判读了get的调用次数,还判断了是否有listener。
如果有get调用的话,他就是notifyAll,唤醒那些竞争锁失败的线程。
private synchronized boolean checkNotifyWaiters() {
if (waiters > 0) {
notifyAll();
}
return listeners != null;
}
参考:netty进阶指南-李林峰
本文作者:WKP9418
本文连接:https:///qq_43179428/article/details/140575847
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- awee.cn 版权所有 湘ICP备2023022495号-5
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务