博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的NetworkEnvironmentConfiguration
阅读量:6379 次
发布时间:2019-06-23

本文共 25737 字,大约阅读时间需要 85 分钟。

本文主要研究一下flink的NetworkEnvironmentConfiguration

NetworkEnvironmentConfiguration

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java

public class NetworkEnvironmentConfiguration {    private final float networkBufFraction;    private final long networkBufMin;    private final long networkBufMax;    private final int networkBufferSize;    private final IOMode ioMode;    private final int partitionRequestInitialBackoff;    private final int partitionRequestMaxBackoff;    private final int networkBuffersPerChannel;    private final int floatingNetworkBuffersPerGate;    private final NettyConfig nettyConfig;    /**     * Constructor for a setup with purely local communication (no netty).     */    public NetworkEnvironmentConfiguration(            float networkBufFraction,            long networkBufMin,            long networkBufMax,            int networkBufferSize,            IOMode ioMode,            int partitionRequestInitialBackoff,            int partitionRequestMaxBackoff,            int networkBuffersPerChannel,            int floatingNetworkBuffersPerGate) {        this(networkBufFraction, networkBufMin, networkBufMax, networkBufferSize,                ioMode,                partitionRequestInitialBackoff, partitionRequestMaxBackoff,                networkBuffersPerChannel, floatingNetworkBuffersPerGate,                null);            }    public NetworkEnvironmentConfiguration(            float networkBufFraction,            long networkBufMin,            long networkBufMax,            int networkBufferSize,            IOMode ioMode,            int partitionRequestInitialBackoff,            int partitionRequestMaxBackoff,            int networkBuffersPerChannel,            int floatingNetworkBuffersPerGate,            @Nullable NettyConfig nettyConfig) {        this.networkBufFraction = networkBufFraction;        this.networkBufMin = networkBufMin;        this.networkBufMax = networkBufMax;        this.networkBufferSize = networkBufferSize;        this.ioMode = ioMode;        this.partitionRequestInitialBackoff = partitionRequestInitialBackoff;        this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;        this.networkBuffersPerChannel = networkBuffersPerChannel;        this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;        this.nettyConfig = nettyConfig;    }    // ------------------------------------------------------------------------    public float networkBufFraction() {        return networkBufFraction;    }    public long networkBufMin() {        return networkBufMin;    }    public long networkBufMax() {        return networkBufMax;    }    public int networkBufferSize() {        return networkBufferSize;    }    public IOMode ioMode() {        return ioMode;    }    public int partitionRequestInitialBackoff() {        return partitionRequestInitialBackoff;    }    public int partitionRequestMaxBackoff() {        return partitionRequestMaxBackoff;    }    public int networkBuffersPerChannel() {        return networkBuffersPerChannel;    }    public int floatingNetworkBuffersPerGate() {        return floatingNetworkBuffersPerGate;    }    public NettyConfig nettyConfig() {        return nettyConfig;    }    // ------------------------------------------------------------------------    @Override    public int hashCode() {        int result = 1;        result = 31 * result + networkBufferSize;        result = 31 * result + ioMode.hashCode();        result = 31 * result + partitionRequestInitialBackoff;        result = 31 * result + partitionRequestMaxBackoff;        result = 31 * result + networkBuffersPerChannel;        result = 31 * result + floatingNetworkBuffersPerGate;        result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0);        return result;    }    @Override    public boolean equals(Object obj) {        if (this == obj) {            return true;        }        else if (obj == null || getClass() != obj.getClass()) {            return false;        }        else {            final NetworkEnvironmentConfiguration that = (NetworkEnvironmentConfiguration) obj;            return this.networkBufFraction == that.networkBufFraction &&                    this.networkBufMin == that.networkBufMin &&                    this.networkBufMax == that.networkBufMax &&                    this.networkBufferSize == that.networkBufferSize &&                    this.partitionRequestInitialBackoff == that.partitionRequestInitialBackoff &&                    this.partitionRequestMaxBackoff == that.partitionRequestMaxBackoff &&                    this.networkBuffersPerChannel == that.networkBuffersPerChannel &&                    this.floatingNetworkBuffersPerGate == that.floatingNetworkBuffersPerGate &&                    this.ioMode == that.ioMode &&                     (nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null);        }    }    @Override    public String toString() {        return "NetworkEnvironmentConfiguration{" +                "networkBufFraction=" + networkBufFraction +                ", networkBufMin=" + networkBufMin +                ", networkBufMax=" + networkBufMax +                ", networkBufferSize=" + networkBufferSize +                ", ioMode=" + ioMode +                ", partitionRequestInitialBackoff=" + partitionRequestInitialBackoff +                ", partitionRequestMaxBackoff=" + partitionRequestMaxBackoff +                ", networkBuffersPerChannel=" + networkBuffersPerChannel +                ", floatingNetworkBuffersPerGate=" + floatingNetworkBuffersPerGate +                ", nettyConfig=" + nettyConfig +                '}';    }}
  • NetworkEnvironmentConfiguration主要是flink network的相关配置,里头有networkBufFraction、networkBufMin、networkBufMax、networkBufferSize、ioMode、partitionRequestInitialBackoff、partitionRequestMaxBackoff、networkBuffersPerChannel、floatingNetworkBuffersPerGate、nettyConfig属性

TaskManagerServicesConfiguration

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java

public class TaskManagerServicesConfiguration {    //......    /**     * Creates the {@link NetworkEnvironmentConfiguration} from the given {@link Configuration}.     *     * @param configuration to create the network environment configuration from     * @param localTaskManagerCommunication true if task manager communication is local     * @param taskManagerAddress address of the task manager     * @param slots to start the task manager with     * @return Network environment configuration     */    @SuppressWarnings("deprecation")    private static NetworkEnvironmentConfiguration parseNetworkEnvironmentConfiguration(        Configuration configuration,        boolean localTaskManagerCommunication,        InetAddress taskManagerAddress,        int slots) throws Exception {        // ----> hosts / ports for communication and data exchange        int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT);        checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(),            "Leave config parameter empty or use 0 to let the system choose a port automatically.");        checkConfigParameter(slots >= 1, slots, TaskManagerOptions.NUM_TASK_SLOTS.key(),            "Number of task slots must be at least one.");        final int pageSize = checkedDownCast(MemorySize.parse(configuration.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());        // check page size of for minimum size        checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,            TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),            "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);        // check page size for power of two        checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,            TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),            "Memory segment size must be a power of 2.");        // network buffer memory fraction        float networkBufFraction = configuration.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);        long networkBufMin = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();        long networkBufMax = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();        checkNetworkBufferConfig(pageSize, networkBufFraction, networkBufMin, networkBufMax);        // fallback: number of network buffers        final int numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);        checkNetworkConfigOld(numNetworkBuffers);        if (!hasNewNetworkBufConf(configuration)) {            // map old config to new one:            networkBufMin = networkBufMax = ((long) numNetworkBuffers) * pageSize;        } else {            if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {                LOG.info("Ignoring old (but still present) network buffer configuration via {}.",                    TaskManagerOptions.NETWORK_NUM_BUFFERS.key());            }        }        final NettyConfig nettyConfig;        if (!localTaskManagerCommunication) {            final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);            nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(),                taskManagerInetSocketAddress.getPort(), pageSize, slots, configuration);        } else {            nettyConfig = null;        }        // Default spill I/O mode for intermediate results        final String syncOrAsync = configuration.getString(            ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,            ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);        final IOManager.IOMode ioMode;        if (syncOrAsync.equals("async")) {            ioMode = IOManager.IOMode.ASYNC;        } else {            ioMode = IOManager.IOMode.SYNC;        }        int initialRequestBackoff = configuration.getInteger(            TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);        int maxRequestBackoff = configuration.getInteger(            TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);        int buffersPerChannel = configuration.getInteger(            TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);        int extraBuffersPerGate = configuration.getInteger(            TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);        return new NetworkEnvironmentConfiguration(            networkBufFraction,            networkBufMin,            networkBufMax,            pageSize,            ioMode,            initialRequestBackoff,            maxRequestBackoff,            buffersPerChannel,            extraBuffersPerGate,            nettyConfig);    }    //......}
  • TaskManagerServicesConfiguration有个私有方法parseNetworkEnvironmentConfiguration,用于创建NetworkEnvironmentConfiguration;它会读取TaskManagerOptions.MEMORY_SEGMENT_SIZE、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX、TaskManagerOptions.NETWORK_NUM_BUFFERS、ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX、TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL、TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE等配置

TaskManagerOptions

flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java

@PublicEvolvingpublic class TaskManagerOptions {    //......    /**     * Size of memory buffers used by the network stack and the memory manager.     */    public static final ConfigOption
MEMORY_SEGMENT_SIZE = key("taskmanager.memory.segment-size") .defaultValue("32kb") .withDescription("Size of memory buffers used by the network stack and the memory manager."); /** * Fraction of JVM memory to use for network buffers. */ public static final ConfigOption
NETWORK_BUFFERS_MEMORY_FRACTION = key("taskmanager.network.memory.fraction") .defaultValue(0.1f) .withDescription("Fraction of JVM memory to use for network buffers. This determines how many streaming" + " data exchange channels a TaskManager can have at the same time and how well buffered the channels" + " are. If a job is rejected or you get a warning that the system has not enough buffers available," + " increase this value or the min/max values below. Also note, that \"taskmanager.network.memory.min\"" + "` and \"taskmanager.network.memory.max\" may override this fraction."); /** * Minimum memory size for network buffers. */ public static final ConfigOption
NETWORK_BUFFERS_MEMORY_MIN = key("taskmanager.network.memory.min") .defaultValue("64mb") .withDescription("Minimum memory size for network buffers."); /** * Maximum memory size for network buffers. */ public static final ConfigOption
NETWORK_BUFFERS_MEMORY_MAX = key("taskmanager.network.memory.max") .defaultValue("1gb") .withDescription("Maximum memory size for network buffers."); /** * Number of buffers used in the network stack. This defines the number of possible tasks and * shuffles. * * @deprecated use {@link #NETWORK_BUFFERS_MEMORY_FRACTION}, {@link #NETWORK_BUFFERS_MEMORY_MIN}, * and {@link #NETWORK_BUFFERS_MEMORY_MAX} instead */ @Deprecated public static final ConfigOption
NETWORK_NUM_BUFFERS = key("taskmanager.network.numberOfBuffers") .defaultValue(2048); /** * Minimum backoff for partition requests of input channels. */ public static final ConfigOption
NETWORK_REQUEST_BACKOFF_INITIAL = key("taskmanager.network.request-backoff.initial") .defaultValue(100) .withDeprecatedKeys("taskmanager.net.request-backoff.initial") .withDescription("Minimum backoff in milliseconds for partition requests of input channels."); /** * Maximum backoff for partition requests of input channels. */ public static final ConfigOption
NETWORK_REQUEST_BACKOFF_MAX = key("taskmanager.network.request-backoff.max") .defaultValue(10000) .withDeprecatedKeys("taskmanager.net.request-backoff.max") .withDescription("Maximum backoff in milliseconds for partition requests of input channels."); /** * Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). * *

Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization. */ public static final ConfigOption

NETWORK_BUFFERS_PER_CHANNEL = key("taskmanager.network.memory.buffers-per-channel") .defaultValue(2) .withDescription("Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." + "In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" + " configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" + " for parallel serialization."); /** * Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). */ public static final ConfigOption
NETWORK_EXTRA_BUFFERS_PER_GATE = key("taskmanager.network.memory.floating-buffers-per-gate") .defaultValue(8) .withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)." + " In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels." + " The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can" + " help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be" + " increased in case of higher round trip times between nodes and/or larger number of machines in the cluster."); //......}

  • taskmanager.memory.segment-size指定memory segment的大小,默认为32kb;taskmanager.network.memory.fraction指定network buffers使用的memory的比例,默认为0.1;taskmanager.network.memory.min指定network buffers使用的最小内存,默认为64mb;taskmanager.network.memory.max指定network buffers使用的最大内存,默认为1gb;taskmanager.network.numberOfBuffers指定network使用的buffers数量,默认为2048,该配置已经被废弃,使用taskmanager.network.memory.fraction、taskmanager.network.memory.min、taskmanager.network.memory.max这几个配置来替代
  • taskmanager.network.request-backoff.initial指定input channels的partition requests的最小backoff时间(毫秒),默认为100;taskmanager.network.request-backoff.max指定input channels的partition requests的最大backoff时间(毫秒),默认为10000
  • taskmanager.network.memory.buffers-per-channel指定每个outgoing/incoming channel使用buffers数量,默认为2;taskmanager.network.memory.floating-buffers-per-gate指定每个outgoing/incoming gate使用buffers数量,默认为8

NettyConfig

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java

public class NettyConfig {    private static final Logger LOG = LoggerFactory.getLogger(NettyConfig.class);    // - Config keys ----------------------------------------------------------    public static final ConfigOption
NUM_ARENAS = ConfigOptions .key("taskmanager.network.netty.num-arenas") .defaultValue(-1) .withDeprecatedKeys("taskmanager.net.num-arenas") .withDescription("The number of Netty arenas."); public static final ConfigOption
NUM_THREADS_SERVER = ConfigOptions .key("taskmanager.network.netty.server.numThreads") .defaultValue(-1) .withDeprecatedKeys("taskmanager.net.server.numThreads") .withDescription("The number of Netty server threads."); public static final ConfigOption
NUM_THREADS_CLIENT = ConfigOptions .key("taskmanager.network.netty.client.numThreads") .defaultValue(-1) .withDeprecatedKeys("taskmanager.net.client.numThreads") .withDescription("The number of Netty client threads."); public static final ConfigOption
CONNECT_BACKLOG = ConfigOptions .key("taskmanager.network.netty.server.backlog") .defaultValue(0) // default: 0 => Netty's default .withDeprecatedKeys("taskmanager.net.server.backlog") .withDescription("The netty server connection backlog."); public static final ConfigOption
CLIENT_CONNECT_TIMEOUT_SECONDS = ConfigOptions .key("taskmanager.network.netty.client.connectTimeoutSec") .defaultValue(120) // default: 120s = 2min .withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec") .withDescription("The Netty client connection timeout."); public static final ConfigOption
SEND_RECEIVE_BUFFER_SIZE = ConfigOptions .key("taskmanager.network.netty.sendReceiveBufferSize") .defaultValue(0) // default: 0 => Netty's default .withDeprecatedKeys("taskmanager.net.sendReceiveBufferSize") .withDescription("The Netty send and receive buffer size. This defaults to the system buffer size" + " (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux."); public static final ConfigOption
TRANSPORT_TYPE = ConfigOptions .key("taskmanager.network.netty.transport") .defaultValue("nio") .withDeprecatedKeys("taskmanager.net.transport") .withDescription("The Netty transport type, either \"nio\" or \"epoll\""); // ------------------------------------------------------------------------ enum TransportType { NIO, EPOLL, AUTO } static final String SERVER_THREAD_GROUP_NAME = "Flink Netty Server"; static final String CLIENT_THREAD_GROUP_NAME = "Flink Netty Client"; private final InetAddress serverAddress; private final int serverPort; private final int memorySegmentSize; private final int numberOfSlots; private final Configuration config; // optional configuration public NettyConfig( InetAddress serverAddress, int serverPort, int memorySegmentSize, int numberOfSlots, Configuration config) { this.serverAddress = checkNotNull(serverAddress); checkArgument(serverPort >= 0 && serverPort <= 65536, "Invalid port number."); this.serverPort = serverPort; checkArgument(memorySegmentSize > 0, "Invalid memory segment size."); this.memorySegmentSize = memorySegmentSize; checkArgument(numberOfSlots > 0, "Number of slots"); this.numberOfSlots = numberOfSlots; this.config = checkNotNull(config); LOG.info(this.toString()); } InetAddress getServerAddress() { return serverAddress; } int getServerPort() { return serverPort; } int getMemorySegmentSize() { return memorySegmentSize; } public int getNumberOfSlots() { return numberOfSlots; } // ------------------------------------------------------------------------ // Getters // ------------------------------------------------------------------------ public int getServerConnectBacklog() { return config.getInteger(CONNECT_BACKLOG); } public int getNumberOfArenas() { // default: number of slots final int configValue = config.getInteger(NUM_ARENAS); return configValue == -1 ? numberOfSlots : configValue; } public int getServerNumThreads() { // default: number of task slots final int configValue = config.getInteger(NUM_THREADS_SERVER); return configValue == -1 ? numberOfSlots : configValue; } public int getClientNumThreads() { // default: number of task slots final int configValue = config.getInteger(NUM_THREADS_CLIENT); return configValue == -1 ? numberOfSlots : configValue; } public int getClientConnectTimeoutSeconds() { return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS); } public int getSendAndReceiveBufferSize() { return config.getInteger(SEND_RECEIVE_BUFFER_SIZE); } public TransportType getTransportType() { String transport = config.getString(TRANSPORT_TYPE); switch (transport) { case "nio": return TransportType.NIO; case "epoll": return TransportType.EPOLL; default: return TransportType.AUTO; } } @Nullable public SSLHandlerFactory createClientSSLEngineFactory() throws Exception { return getSSLEnabled() ? SSLUtils.createInternalClientSSLEngineFactory(config) : null; } @Nullable public SSLHandlerFactory createServerSSLEngineFactory() throws Exception { return getSSLEnabled() ? SSLUtils.createInternalServerSSLEngineFactory(config) : null; } public boolean getSSLEnabled() { return config.getBoolean(TaskManagerOptions.DATA_SSL_ENABLED) && SSLUtils.isInternalSSLEnabled(config); } public boolean isCreditBasedEnabled() { return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL); } public Configuration getConfig() { return config; } @Override public String toString() { String format = "NettyConfig [" + "server address: %s, " + "server port: %d, " + "ssl enabled: %s, " + "memory segment size (bytes): %d, " + "transport type: %s, " + "number of server threads: %d (%s), " + "number of client threads: %d (%s), " + "server connect backlog: %d (%s), " + "client connect timeout (sec): %d, " + "send/receive buffer size (bytes): %d (%s)]"; String def = "use Netty's default"; String man = "manual"; return String.format(format, serverAddress, serverPort, getSSLEnabled() ? "true" : "false", memorySegmentSize, getTransportType(), getServerNumThreads(), getServerNumThreads() == 0 ? def : man, getClientNumThreads(), getClientNumThreads() == 0 ? def : man, getServerConnectBacklog(), getServerConnectBacklog() == 0 ? def : man, getClientConnectTimeoutSeconds(), getSendAndReceiveBufferSize(), getSendAndReceiveBufferSize() == 0 ? def : man); }}
  • NettyConfig的构造器接收serverAddress、serverPort、memorySegmentSize、numberOfSlots、config这几个参数;它还提供了getServerConnectBacklog、getNumberOfArenas、getServerNumThreads、getClientNumThreads、getClientConnectTimeoutSeconds、getSendAndReceiveBufferSize、getTransportType等方法用于从config读取配置
  • taskmanager.network.netty.server.backlog用于指定netty server的connection backlog,默认值为0即使用netty默认的配置;taskmanager.network.netty.client.connectTimeoutSec指定netty client的connection timeout,默认为120(单位秒);taskmanager.network.netty.sendReceiveBufferSize指定netty send/receive buffer大小,默认为0即使用netty的默认配置,默认是使用system buffer size,即/proc/sys/net/ipv4/tcp_[rw]mem的配置;taskmanager.network.netty.transport指定的是netty transport的类型,默认是nio
  • taskmanager.network.netty.num-arenas指定的是netty arenas的数量,默认为-1;taskmanager.network.netty.server.numThreads指定的是netty server的threads数量,默认为-1;taskmanager.network.netty.client.numThreads指定的是netty client的threads数量,默认为-1;这几个配置当配置值为-1的时候,对应get方法返回的是numberOfSlots值

小结

  • NetworkEnvironmentConfiguration主要是flink network的相关配置,里头有networkBufFraction、networkBufMin、networkBufMax、networkBufferSize、ioMode、partitionRequestInitialBackoff、partitionRequestMaxBackoff、networkBuffersPerChannel、floatingNetworkBuffersPerGate、nettyConfig属性
  • TaskManagerServicesConfiguration有个私有方法parseNetworkEnvironmentConfiguration,用于创建NetworkEnvironmentConfiguration;它会读取TaskManagerOptions.MEMORY_SEGMENT_SIZE、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX、TaskManagerOptions.NETWORK_NUM_BUFFERS、ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX、TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL、TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE等配置
  • NettyConfig的构造器接收serverAddress、serverPort、memorySegmentSize、numberOfSlots、config这几个参数;它还提供了getServerConnectBacklog、getNumberOfArenas、getServerNumThreads、getClientNumThreads、getClientConnectTimeoutSeconds、getSendAndReceiveBufferSize、getTransportType等方法用于从config读取配置

doc

转载地址:http://ygqqa.baihongyu.com/

你可能感兴趣的文章
C++数组和指针
查看>>
恭贺自己itpub和csdn双双获得专家博客称号
查看>>
xml 转map dom4j
查看>>
Vitamio视频播放器
查看>>
Java编程的逻辑 (66) - 理解synchronized
查看>>
[置顶] android 自定义ListView实现动画特效
查看>>
机器学习A-Z~Logistic Regression
查看>>
聊聊flink的NetworkEnvironmentConfiguration
查看>>
【Go】strings.Replace 与 bytes.Replace 调优
查看>>
RSA签名的PSS模式
查看>>
c# 注销 代码
查看>>
ubuntu 安装-apache2-trac-ldap【验证】-svn-mysql
查看>>
Nginx 安装
查看>>
php GD库
查看>>
项目管理
查看>>
隐私政策
查看>>
二分搜索树
查看>>
[折半查找]排序数组中某个元素出现次数
查看>>
【11-01】Sublime text 学习笔记
查看>>
.wav file research
查看>>