博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的ConnectionManager
阅读量:7050 次
发布时间:2019-06-28

本文共 11788 字,大约阅读时间需要 39 分钟。

本文主要研究一下flink的ConnectionManager

ConnectionManager

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java

public interface ConnectionManager {	void start(ResultPartitionProvider partitionProvider,				TaskEventDispatcher taskEventDispatcher) throws IOException;	/**	 * Creates a {@link PartitionRequestClient} instance for the given {@link ConnectionID}.	 */	PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException;	/**	 * Closes opened ChannelConnections in case of a resource release.	 */	void closeOpenChannelConnections(ConnectionID connectionId);	int getNumberOfActiveConnections();	int getDataPort();	void shutdown() throws IOException;}复制代码
  • ConnectionManager定义了start、shutdown、closeOpenChannelConnections等方法用于管理physical connections;它有两个子类,一个是LocalConnectionManager,一个是NettyConnectionManager

LocalConnectionManager

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java

public class LocalConnectionManager implements ConnectionManager {	@Override	public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) {	}	@Override	public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) {		return null;	}	@Override	public void closeOpenChannelConnections(ConnectionID connectionId) {}	@Override	public int getNumberOfActiveConnections() {		return 0;	}	@Override	public int getDataPort() {		return -1;	}	@Override	public void shutdown() {}}复制代码
  • LocalConnectionManager实现了ConnectionManager接口,不过它的实现基本是空操作

NettyConnectionManager

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java

public class NettyConnectionManager implements ConnectionManager {	private final NettyServer server;	private final NettyClient client;	private final NettyBufferPool bufferPool;	private final PartitionRequestClientFactory partitionRequestClientFactory;	public NettyConnectionManager(NettyConfig nettyConfig) {		this.server = new NettyServer(nettyConfig);		this.client = new NettyClient(nettyConfig);		this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());		this.partitionRequestClientFactory = new PartitionRequestClientFactory(client);	}	@Override	public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException {		NettyProtocol partitionRequestProtocol = new NettyProtocol(			partitionProvider,			taskEventDispatcher,			client.getConfig().isCreditBasedEnabled());		client.init(partitionRequestProtocol, bufferPool);		server.init(partitionRequestProtocol, bufferPool);	}	@Override	public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)			throws IOException, InterruptedException {		return partitionRequestClientFactory.createPartitionRequestClient(connectionId);	}	@Override	public void closeOpenChannelConnections(ConnectionID connectionId) {		partitionRequestClientFactory.closeOpenChannelConnections(connectionId);	}	@Override	public int getNumberOfActiveConnections() {		return partitionRequestClientFactory.getNumberOfActiveClients();	}	@Override	public int getDataPort() {		if (server != null && server.getLocalAddress() != null) {			return server.getLocalAddress().getPort();		} else {			return -1;		}	}	@Override	public void shutdown() {		client.shutdown();		server.shutdown();	}	NettyClient getClient() {		return client;	}	NettyServer getServer() {		return server;	}	NettyBufferPool getBufferPool() {		return bufferPool;	}}复制代码
  • NettyConnectionManager实现了ConnectionManager接口;它的构造器使用NettyConfig创建了NettyServer、NettyClient、NettyBufferPool,同时使用NettyClient创建了PartitionRequestClientFactory
  • start方法创建了NettyProtocol,同时初始化NettyClient、NettyServer;shutdown方法则关闭NettyClient、NettyServer;closeOpenChannelConnections则是使用partitionRequestClientFactory.closeOpenChannelConnections来关闭指定的connectionId
  • createPartitionRequestClient方法通过partitionRequestClientFactory.createPartitionRequestClient来创建PartitionRequestClient

PartitionRequestClientFactory

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java

class PartitionRequestClientFactory {	private final NettyClient nettyClient;	private final ConcurrentMap
clients = new ConcurrentHashMap
(); PartitionRequestClientFactory(NettyClient nettyClient) { this.nettyClient = nettyClient; } /** * Atomically establishes a TCP connection to the given remote address and * creates a {@link PartitionRequestClient} instance for this connection. */ PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException { Object entry; PartitionRequestClient client = null; while (client == null) { entry = clients.get(connectionId); if (entry != null) { // Existing channel or connecting channel if (entry instanceof PartitionRequestClient) { client = (PartitionRequestClient) entry; } else { ConnectingChannel future = (ConnectingChannel) entry; client = future.waitForChannel(); clients.replace(connectionId, future, client); } } else { // No channel yet. Create one, but watch out for a race. // We create a "connecting future" and atomically add it to the map. // Only the thread that really added it establishes the channel. // The others need to wait on that original establisher's future. ConnectingChannel connectingChannel = new ConnectingChannel(connectionId, this); Object old = clients.putIfAbsent(connectionId, connectingChannel); if (old == null) { nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel); client = connectingChannel.waitForChannel(); clients.replace(connectionId, connectingChannel, client); } else if (old instanceof ConnectingChannel) { client = ((ConnectingChannel) old).waitForChannel(); clients.replace(connectionId, old, client); } else { client = (PartitionRequestClient) old; } } // Make sure to increment the reference count before handing a client // out to ensure correct bookkeeping for channel closing. if (!client.incrementReferenceCounter()) { destroyPartitionRequestClient(connectionId, client); client = null; } } return client; } public void closeOpenChannelConnections(ConnectionID connectionId) { Object entry = clients.get(connectionId); if (entry instanceof ConnectingChannel) { ConnectingChannel channel = (ConnectingChannel) entry; if (channel.dispose()) { clients.remove(connectionId, channel); } } } int getNumberOfActiveClients() { return clients.size(); } /** * Removes the client for the given {@link ConnectionID}. */ void destroyPartitionRequestClient(ConnectionID connectionId, PartitionRequestClient client) { clients.remove(connectionId, client); } //......}复制代码
  • PartitionRequestClientFactory的构造器需要一个NettyClient;它使用ConcurrentHashMap在内存维护了一个ConnectionID与PartitionRequestClient或ConnectingChannel的映射关系
  • createPartitionRequestClient方法会先从ConcurrentHashMap查找是否有对应ConnectionID的PartitionRequestClient或ConnectingChannel,如果存在且是PartitionRequestClient实例则返回,如果存在且是ConnectingChannel实例则调用ConnectingChannel.waitForChannel等待PartitionRequestClient,然后替换对应ConnectionID在ConcurrentHashMap的值为PartitionRequestClient;如果ConcurrentHashMap没有对应ConnectionID的值,则会创建一个ConnectingChannel,然后放入到ConcurrentHashMap中,同时获取old object,如果old为null,则使用nettyClient.connect进行连接,然后获取PartitionRequestClient,之后替换ConcurrentHashMap中的值;如果old是ConnectingChannel则调用ConnectingChannel.waitForChannel等待PartitionRequestClient,然后替换ConcurrentHashMap中的值;在返回PartitionRequestClient之前会通过client.incrementReferenceCounter()来递增引用,如果递增不成功则调用destroyPartitionRequestClient,返回null,递增成功则返回PartitionRequestClient
  • closeOpenChannelConnections方法则判断,如果是ConnectingChannel,则调用ConnectingChannel.dispose,成功之后从ConcurrentHashMap中移除

ConnectingChannel

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java

private static final class ConnectingChannel implements ChannelFutureListener {		private final Object connectLock = new Object();		private final ConnectionID connectionId;		private final PartitionRequestClientFactory clientFactory;		private boolean disposeRequestClient = false;		public ConnectingChannel(ConnectionID connectionId, PartitionRequestClientFactory clientFactory) {			this.connectionId = connectionId;			this.clientFactory = clientFactory;		}		private boolean dispose() {			boolean result;			synchronized (connectLock) {				if (partitionRequestClient != null) {					result = partitionRequestClient.disposeIfNotUsed();				}				else {					disposeRequestClient = true;					result = true;				}				connectLock.notifyAll();			}			return result;		}		private void handInChannel(Channel channel) {			synchronized (connectLock) {				try {					NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class);					partitionRequestClient = new PartitionRequestClient(						channel, clientHandler, connectionId, clientFactory);					if (disposeRequestClient) {						partitionRequestClient.disposeIfNotUsed();					}					connectLock.notifyAll();				}				catch (Throwable t) {					notifyOfError(t);				}			}		}		private volatile PartitionRequestClient partitionRequestClient;		private volatile Throwable error;		private PartitionRequestClient waitForChannel() throws IOException, InterruptedException {			synchronized (connectLock) {				while (error == null && partitionRequestClient == null) {					connectLock.wait(2000);				}			}			if (error != null) {				throw new IOException("Connecting the channel failed: " + error.getMessage(), error);			}			return partitionRequestClient;		}		private void notifyOfError(Throwable error) {			synchronized (connectLock) {				this.error = error;				connectLock.notifyAll();			}		}		@Override		public void operationComplete(ChannelFuture future) throws Exception {			if (future.isSuccess()) {				handInChannel(future.channel());			}			else if (future.cause() != null) {				notifyOfError(new RemoteTransportException(						"Connecting to remote task manager + '" + connectionId.getAddress() +								"' has failed. This might indicate that the remote task " +								"manager has been lost.",						connectionId.getAddress(), future.cause()));			}			else {				notifyOfError(new LocalTransportException(					String.format(						"Connecting to remote task manager '%s' has been cancelled.",						connectionId.getAddress()),					null));			}		}	}复制代码
  • ConnectingChannel实现了netty的ChannelFutureListener接口,它的operationComplete方法在ChannelFuture是success的时候会调用handInChannel方法,该方法会创建PartitionRequestClient;waitForChannel方法则会等待partitionRequestClient创建成功然后返回

小结

  • ConnectionManager定义了start、shutdown、closeOpenChannelConnections等方法用于管理physical connections;它有两个子类,一个是LocalConnectionManager,一个是NettyConnectionManager
  • LocalConnectionManager实现了ConnectionManager接口,不过它的实现基本是空操作;NettyConnectionManager实现了ConnectionManager接口,它的构造器使用NettyConfig创建了NettyServer、NettyClient、NettyBufferPool,同时使用NettyClient创建了PartitionRequestClientFactory,start方法创建了NettyProtocol,同时初始化NettyClient、NettyServer,shutdown方法则关闭NettyClient、NettyServer,closeOpenChannelConnections则是使用partitionRequestClientFactory.closeOpenChannelConnections来关闭指定的connectionId,createPartitionRequestClient方法通过partitionRequestClientFactory.createPartitionRequestClient来创建PartitionRequestClient
  • PartitionRequestClientFactory的构造器需要一个NettyClient;它使用ConcurrentHashMap在内存维护了一个ConnectionID与PartitionRequestClient或ConnectingChannel的映射关系;ConnectingChannel实现了netty的ChannelFutureListener接口,它的operationComplete方法在ChannelFuture是success的时候会调用handInChannel方法,该方法会创建PartitionRequestClient;waitForChannel方法则会等待partitionRequestClient创建成功然后返回

doc

转载地址:http://wepol.baihongyu.com/

你可能感兴趣的文章
ruby的并发和并行
查看>>
朱晶晶-六步制定好企业移动化战略
查看>>
SVPullToRefresh
查看>>
SSIndicatorLabel
查看>>
ASFBPostController
查看>>
Android实战技巧:Handler
查看>>
JqueryMobile实践点滴
查看>>
teamtalk服务端之完美一键部署脚本(ubuntu)
查看>>
2014.7.26 为cocos2d-x3.2版本增加protobuffer2.5.0支持
查看>>
Java进阶篇设计模式之一 ----- 单例模式
查看>>
字符串循环右移算法
查看>>
一分钟了解数据库扩展
查看>>
MyBatis在Spring中的事务管理
查看>>
springboot2.0下为JPA定义多个默认数据源
查看>>
谁来为程序猿的996买单?
查看>>
面试题(6)
查看>>
2017-07-07
查看>>
EasyUI介绍
查看>>
input 输入框获得/失去焦点时隐藏/显示文字(jquery版)
查看>>
微信相册
查看>>