序
本文主要研究一下flink的NetworkEnvironmentConfiguration
NetworkEnvironmentConfiguration
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
public class NetworkEnvironmentConfiguration { private final float networkBufFraction; private final long networkBufMin; private final long networkBufMax; private final int networkBufferSize; private final IOMode ioMode; private final int partitionRequestInitialBackoff; private final int partitionRequestMaxBackoff; private final int networkBuffersPerChannel; private final int floatingNetworkBuffersPerGate; private final NettyConfig nettyConfig; /** * Constructor for a setup with purely local communication (no netty). */ public NetworkEnvironmentConfiguration( float networkBufFraction, long networkBufMin, long networkBufMax, int networkBufferSize, IOMode ioMode, int partitionRequestInitialBackoff, int partitionRequestMaxBackoff, int networkBuffersPerChannel, int floatingNetworkBuffersPerGate) { this(networkBufFraction, networkBufMin, networkBufMax, networkBufferSize, ioMode, partitionRequestInitialBackoff, partitionRequestMaxBackoff, networkBuffersPerChannel, floatingNetworkBuffersPerGate, null); } public NetworkEnvironmentConfiguration( float networkBufFraction, long networkBufMin, long networkBufMax, int networkBufferSize, IOMode ioMode, int partitionRequestInitialBackoff, int partitionRequestMaxBackoff, int networkBuffersPerChannel, int floatingNetworkBuffersPerGate, @Nullable NettyConfig nettyConfig) { this.networkBufFraction = networkBufFraction; this.networkBufMin = networkBufMin; this.networkBufMax = networkBufMax; this.networkBufferSize = networkBufferSize; this.ioMode = ioMode; this.partitionRequestInitialBackoff = partitionRequestInitialBackoff; this.partitionRequestMaxBackoff = partitionRequestMaxBackoff; this.networkBuffersPerChannel = networkBuffersPerChannel; this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate; this.nettyConfig = nettyConfig; } // ------------------------------------------------------------------------ public float networkBufFraction() { return networkBufFraction; } public long networkBufMin() { return networkBufMin; } public long networkBufMax() { return networkBufMax; } public int networkBufferSize() { return networkBufferSize; } public IOMode ioMode() { return ioMode; } public int partitionRequestInitialBackoff() { return partitionRequestInitialBackoff; } public int partitionRequestMaxBackoff() { return partitionRequestMaxBackoff; } public int networkBuffersPerChannel() { return networkBuffersPerChannel; } public int floatingNetworkBuffersPerGate() { return floatingNetworkBuffersPerGate; } public NettyConfig nettyConfig() { return nettyConfig; } // ------------------------------------------------------------------------ @Override public int hashCode() { int result = 1; result = 31 * result + networkBufferSize; result = 31 * result + ioMode.hashCode(); result = 31 * result + partitionRequestInitialBackoff; result = 31 * result + partitionRequestMaxBackoff; result = 31 * result + networkBuffersPerChannel; result = 31 * result + floatingNetworkBuffersPerGate; result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0); return result; } @Override public boolean equals(Object obj) { if (this == obj) { return true; } else if (obj == null || getClass() != obj.getClass()) { return false; } else { final NetworkEnvironmentConfiguration that = (NetworkEnvironmentConfiguration) obj; return this.networkBufFraction == that.networkBufFraction && this.networkBufMin == that.networkBufMin && this.networkBufMax == that.networkBufMax && this.networkBufferSize == that.networkBufferSize && this.partitionRequestInitialBackoff == that.partitionRequestInitialBackoff && this.partitionRequestMaxBackoff == that.partitionRequestMaxBackoff && this.networkBuffersPerChannel == that.networkBuffersPerChannel && this.floatingNetworkBuffersPerGate == that.floatingNetworkBuffersPerGate && this.ioMode == that.ioMode && (nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null); } } @Override public String toString() { return "NetworkEnvironmentConfiguration{" + "networkBufFraction=" + networkBufFraction + ", networkBufMin=" + networkBufMin + ", networkBufMax=" + networkBufMax + ", networkBufferSize=" + networkBufferSize + ", ioMode=" + ioMode + ", partitionRequestInitialBackoff=" + partitionRequestInitialBackoff + ", partitionRequestMaxBackoff=" + partitionRequestMaxBackoff + ", networkBuffersPerChannel=" + networkBuffersPerChannel + ", floatingNetworkBuffersPerGate=" + floatingNetworkBuffersPerGate + ", nettyConfig=" + nettyConfig + '}'; }}
- NetworkEnvironmentConfiguration主要是flink network的相关配置,里头有networkBufFraction、networkBufMin、networkBufMax、networkBufferSize、ioMode、partitionRequestInitialBackoff、partitionRequestMaxBackoff、networkBuffersPerChannel、floatingNetworkBuffersPerGate、nettyConfig属性
TaskManagerServicesConfiguration
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
public class TaskManagerServicesConfiguration { //...... /** * Creates the {@link NetworkEnvironmentConfiguration} from the given {@link Configuration}. * * @param configuration to create the network environment configuration from * @param localTaskManagerCommunication true if task manager communication is local * @param taskManagerAddress address of the task manager * @param slots to start the task manager with * @return Network environment configuration */ @SuppressWarnings("deprecation") private static NetworkEnvironmentConfiguration parseNetworkEnvironmentConfiguration( Configuration configuration, boolean localTaskManagerCommunication, InetAddress taskManagerAddress, int slots) throws Exception { // ----> hosts / ports for communication and data exchange int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT); checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(), "Leave config parameter empty or use 0 to let the system choose a port automatically."); checkConfigParameter(slots >= 1, slots, TaskManagerOptions.NUM_TASK_SLOTS.key(), "Number of task slots must be at least one."); final int pageSize = checkedDownCast(MemorySize.parse(configuration.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes()); // check page size of for minimum size checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize, TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(), "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE); // check page size for power of two checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize, TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(), "Memory segment size must be a power of 2."); // network buffer memory fraction float networkBufFraction = configuration.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION); long networkBufMin = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes(); long networkBufMax = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes(); checkNetworkBufferConfig(pageSize, networkBufFraction, networkBufMin, networkBufMax); // fallback: number of network buffers final int numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); checkNetworkConfigOld(numNetworkBuffers); if (!hasNewNetworkBufConf(configuration)) { // map old config to new one: networkBufMin = networkBufMax = ((long) numNetworkBuffers) * pageSize; } else { if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) { LOG.info("Ignoring old (but still present) network buffer configuration via {}.", TaskManagerOptions.NETWORK_NUM_BUFFERS.key()); } } final NettyConfig nettyConfig; if (!localTaskManagerCommunication) { final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), taskManagerInetSocketAddress.getPort(), pageSize, slots, configuration); } else { nettyConfig = null; } // Default spill I/O mode for intermediate results final String syncOrAsync = configuration.getString( ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE, ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE); final IOManager.IOMode ioMode; if (syncOrAsync.equals("async")) { ioMode = IOManager.IOMode.ASYNC; } else { ioMode = IOManager.IOMode.SYNC; } int initialRequestBackoff = configuration.getInteger( TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); int maxRequestBackoff = configuration.getInteger( TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX); int buffersPerChannel = configuration.getInteger( TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL); int extraBuffersPerGate = configuration.getInteger( TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE); return new NetworkEnvironmentConfiguration( networkBufFraction, networkBufMin, networkBufMax, pageSize, ioMode, initialRequestBackoff, maxRequestBackoff, buffersPerChannel, extraBuffersPerGate, nettyConfig); } //......}
- TaskManagerServicesConfiguration有个私有方法parseNetworkEnvironmentConfiguration,用于创建NetworkEnvironmentConfiguration;它会读取TaskManagerOptions.MEMORY_SEGMENT_SIZE、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX、TaskManagerOptions.NETWORK_NUM_BUFFERS、ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX、TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL、TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE等配置
TaskManagerOptions
flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@PublicEvolvingpublic class TaskManagerOptions { //...... /** * Size of memory buffers used by the network stack and the memory manager. */ public static final ConfigOptionMEMORY_SEGMENT_SIZE = key("taskmanager.memory.segment-size") .defaultValue("32kb") .withDescription("Size of memory buffers used by the network stack and the memory manager."); /** * Fraction of JVM memory to use for network buffers. */ public static final ConfigOption NETWORK_BUFFERS_MEMORY_FRACTION = key("taskmanager.network.memory.fraction") .defaultValue(0.1f) .withDescription("Fraction of JVM memory to use for network buffers. This determines how many streaming" + " data exchange channels a TaskManager can have at the same time and how well buffered the channels" + " are. If a job is rejected or you get a warning that the system has not enough buffers available," + " increase this value or the min/max values below. Also note, that \"taskmanager.network.memory.min\"" + "` and \"taskmanager.network.memory.max\" may override this fraction."); /** * Minimum memory size for network buffers. */ public static final ConfigOption NETWORK_BUFFERS_MEMORY_MIN = key("taskmanager.network.memory.min") .defaultValue("64mb") .withDescription("Minimum memory size for network buffers."); /** * Maximum memory size for network buffers. */ public static final ConfigOption NETWORK_BUFFERS_MEMORY_MAX = key("taskmanager.network.memory.max") .defaultValue("1gb") .withDescription("Maximum memory size for network buffers."); /** * Number of buffers used in the network stack. This defines the number of possible tasks and * shuffles. * * @deprecated use {@link #NETWORK_BUFFERS_MEMORY_FRACTION}, {@link #NETWORK_BUFFERS_MEMORY_MIN}, * and {@link #NETWORK_BUFFERS_MEMORY_MAX} instead */ @Deprecated public static final ConfigOption NETWORK_NUM_BUFFERS = key("taskmanager.network.numberOfBuffers") .defaultValue(2048); /** * Minimum backoff for partition requests of input channels. */ public static final ConfigOption NETWORK_REQUEST_BACKOFF_INITIAL = key("taskmanager.network.request-backoff.initial") .defaultValue(100) .withDeprecatedKeys("taskmanager.net.request-backoff.initial") .withDescription("Minimum backoff in milliseconds for partition requests of input channels."); /** * Maximum backoff for partition requests of input channels. */ public static final ConfigOption NETWORK_REQUEST_BACKOFF_MAX = key("taskmanager.network.request-backoff.max") .defaultValue(10000) .withDeprecatedKeys("taskmanager.net.request-backoff.max") .withDescription("Maximum backoff in milliseconds for partition requests of input channels."); /** * Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). * * Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization. */ public static final ConfigOption
NETWORK_BUFFERS_PER_CHANNEL = key("taskmanager.network.memory.buffers-per-channel") .defaultValue(2) .withDescription("Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." + "In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" + " configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" + " for parallel serialization."); /** * Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). */ public static final ConfigOption NETWORK_EXTRA_BUFFERS_PER_GATE = key("taskmanager.network.memory.floating-buffers-per-gate") .defaultValue(8) .withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)." + " In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels." + " The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can" + " help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be" + " increased in case of higher round trip times between nodes and/or larger number of machines in the cluster."); //......}
- taskmanager.memory.segment-size指定memory segment的大小,默认为32kb;taskmanager.network.memory.fraction指定network buffers使用的memory的比例,默认为0.1;taskmanager.network.memory.min指定network buffers使用的最小内存,默认为64mb;taskmanager.network.memory.max指定network buffers使用的最大内存,默认为1gb;taskmanager.network.numberOfBuffers指定network使用的buffers数量,默认为2048,该配置已经被废弃,使用taskmanager.network.memory.fraction、taskmanager.network.memory.min、taskmanager.network.memory.max这几个配置来替代
- taskmanager.network.request-backoff.initial指定input channels的partition requests的最小backoff时间(
毫秒
),默认为100;taskmanager.network.request-backoff.max指定input channels的partition requests的最大backoff时间(毫秒
),默认为10000 - taskmanager.network.memory.buffers-per-channel指定每个outgoing/incoming channel使用buffers数量,默认为2;taskmanager.network.memory.floating-buffers-per-gate指定每个outgoing/incoming gate使用buffers数量,默认为8
NettyConfig
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
public class NettyConfig { private static final Logger LOG = LoggerFactory.getLogger(NettyConfig.class); // - Config keys ---------------------------------------------------------- public static final ConfigOptionNUM_ARENAS = ConfigOptions .key("taskmanager.network.netty.num-arenas") .defaultValue(-1) .withDeprecatedKeys("taskmanager.net.num-arenas") .withDescription("The number of Netty arenas."); public static final ConfigOption NUM_THREADS_SERVER = ConfigOptions .key("taskmanager.network.netty.server.numThreads") .defaultValue(-1) .withDeprecatedKeys("taskmanager.net.server.numThreads") .withDescription("The number of Netty server threads."); public static final ConfigOption NUM_THREADS_CLIENT = ConfigOptions .key("taskmanager.network.netty.client.numThreads") .defaultValue(-1) .withDeprecatedKeys("taskmanager.net.client.numThreads") .withDescription("The number of Netty client threads."); public static final ConfigOption CONNECT_BACKLOG = ConfigOptions .key("taskmanager.network.netty.server.backlog") .defaultValue(0) // default: 0 => Netty's default .withDeprecatedKeys("taskmanager.net.server.backlog") .withDescription("The netty server connection backlog."); public static final ConfigOption CLIENT_CONNECT_TIMEOUT_SECONDS = ConfigOptions .key("taskmanager.network.netty.client.connectTimeoutSec") .defaultValue(120) // default: 120s = 2min .withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec") .withDescription("The Netty client connection timeout."); public static final ConfigOption SEND_RECEIVE_BUFFER_SIZE = ConfigOptions .key("taskmanager.network.netty.sendReceiveBufferSize") .defaultValue(0) // default: 0 => Netty's default .withDeprecatedKeys("taskmanager.net.sendReceiveBufferSize") .withDescription("The Netty send and receive buffer size. This defaults to the system buffer size" + " (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux."); public static final ConfigOption TRANSPORT_TYPE = ConfigOptions .key("taskmanager.network.netty.transport") .defaultValue("nio") .withDeprecatedKeys("taskmanager.net.transport") .withDescription("The Netty transport type, either \"nio\" or \"epoll\""); // ------------------------------------------------------------------------ enum TransportType { NIO, EPOLL, AUTO } static final String SERVER_THREAD_GROUP_NAME = "Flink Netty Server"; static final String CLIENT_THREAD_GROUP_NAME = "Flink Netty Client"; private final InetAddress serverAddress; private final int serverPort; private final int memorySegmentSize; private final int numberOfSlots; private final Configuration config; // optional configuration public NettyConfig( InetAddress serverAddress, int serverPort, int memorySegmentSize, int numberOfSlots, Configuration config) { this.serverAddress = checkNotNull(serverAddress); checkArgument(serverPort >= 0 && serverPort <= 65536, "Invalid port number."); this.serverPort = serverPort; checkArgument(memorySegmentSize > 0, "Invalid memory segment size."); this.memorySegmentSize = memorySegmentSize; checkArgument(numberOfSlots > 0, "Number of slots"); this.numberOfSlots = numberOfSlots; this.config = checkNotNull(config); LOG.info(this.toString()); } InetAddress getServerAddress() { return serverAddress; } int getServerPort() { return serverPort; } int getMemorySegmentSize() { return memorySegmentSize; } public int getNumberOfSlots() { return numberOfSlots; } // ------------------------------------------------------------------------ // Getters // ------------------------------------------------------------------------ public int getServerConnectBacklog() { return config.getInteger(CONNECT_BACKLOG); } public int getNumberOfArenas() { // default: number of slots final int configValue = config.getInteger(NUM_ARENAS); return configValue == -1 ? numberOfSlots : configValue; } public int getServerNumThreads() { // default: number of task slots final int configValue = config.getInteger(NUM_THREADS_SERVER); return configValue == -1 ? numberOfSlots : configValue; } public int getClientNumThreads() { // default: number of task slots final int configValue = config.getInteger(NUM_THREADS_CLIENT); return configValue == -1 ? numberOfSlots : configValue; } public int getClientConnectTimeoutSeconds() { return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS); } public int getSendAndReceiveBufferSize() { return config.getInteger(SEND_RECEIVE_BUFFER_SIZE); } public TransportType getTransportType() { String transport = config.getString(TRANSPORT_TYPE); switch (transport) { case "nio": return TransportType.NIO; case "epoll": return TransportType.EPOLL; default: return TransportType.AUTO; } } @Nullable public SSLHandlerFactory createClientSSLEngineFactory() throws Exception { return getSSLEnabled() ? SSLUtils.createInternalClientSSLEngineFactory(config) : null; } @Nullable public SSLHandlerFactory createServerSSLEngineFactory() throws Exception { return getSSLEnabled() ? SSLUtils.createInternalServerSSLEngineFactory(config) : null; } public boolean getSSLEnabled() { return config.getBoolean(TaskManagerOptions.DATA_SSL_ENABLED) && SSLUtils.isInternalSSLEnabled(config); } public boolean isCreditBasedEnabled() { return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL); } public Configuration getConfig() { return config; } @Override public String toString() { String format = "NettyConfig [" + "server address: %s, " + "server port: %d, " + "ssl enabled: %s, " + "memory segment size (bytes): %d, " + "transport type: %s, " + "number of server threads: %d (%s), " + "number of client threads: %d (%s), " + "server connect backlog: %d (%s), " + "client connect timeout (sec): %d, " + "send/receive buffer size (bytes): %d (%s)]"; String def = "use Netty's default"; String man = "manual"; return String.format(format, serverAddress, serverPort, getSSLEnabled() ? "true" : "false", memorySegmentSize, getTransportType(), getServerNumThreads(), getServerNumThreads() == 0 ? def : man, getClientNumThreads(), getClientNumThreads() == 0 ? def : man, getServerConnectBacklog(), getServerConnectBacklog() == 0 ? def : man, getClientConnectTimeoutSeconds(), getSendAndReceiveBufferSize(), getSendAndReceiveBufferSize() == 0 ? def : man); }}
- NettyConfig的构造器接收serverAddress、serverPort、memorySegmentSize、numberOfSlots、config这几个参数;它还提供了getServerConnectBacklog、getNumberOfArenas、getServerNumThreads、getClientNumThreads、getClientConnectTimeoutSeconds、getSendAndReceiveBufferSize、getTransportType等方法用于从config读取配置
- taskmanager.network.netty.server.backlog用于指定netty server的connection backlog,默认值为0即使用netty默认的配置;taskmanager.network.netty.client.connectTimeoutSec指定netty client的connection timeout,默认为120(
单位秒
);taskmanager.network.netty.sendReceiveBufferSize指定netty send/receive buffer大小,默认为0即使用netty的默认配置,默认是使用system buffer size,即/proc/sys/net/ipv4/tcp_[rw]mem的配置;taskmanager.network.netty.transport指定的是netty transport的类型,默认是nio - taskmanager.network.netty.num-arenas指定的是netty arenas的数量,默认为-1;taskmanager.network.netty.server.numThreads指定的是netty server的threads数量,默认为-1;taskmanager.network.netty.client.numThreads指定的是netty client的threads数量,默认为-1;这几个配置当配置值为-1的时候,对应get方法返回的是numberOfSlots值
小结
- NetworkEnvironmentConfiguration主要是flink network的相关配置,里头有networkBufFraction、networkBufMin、networkBufMax、networkBufferSize、ioMode、partitionRequestInitialBackoff、partitionRequestMaxBackoff、networkBuffersPerChannel、floatingNetworkBuffersPerGate、nettyConfig属性
- TaskManagerServicesConfiguration有个私有方法parseNetworkEnvironmentConfiguration,用于创建NetworkEnvironmentConfiguration;它会读取TaskManagerOptions.MEMORY_SEGMENT_SIZE、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX、TaskManagerOptions.NETWORK_NUM_BUFFERS、ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX、TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL、TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE等配置
- NettyConfig的构造器接收serverAddress、serverPort、memorySegmentSize、numberOfSlots、config这几个参数;它还提供了getServerConnectBacklog、getNumberOfArenas、getServerNumThreads、getClientNumThreads、getClientConnectTimeoutSeconds、getSendAndReceiveBufferSize、getTransportType等方法用于从config读取配置