[转载] Elasitcsearch 底层系列之 Node 启动过程源码解析

Elasticsearch Node 启动过程源码解析

Elasticsearch 简介

Elasticsearch 是一款开源的分布式搜索引擎,提供了近实时的查询能力和强大的聚合分析能力。与Elastic官方提供的其他组件(Beats、Logstash、Kibana)组合成 Elastic Stack,提供了多种使用场景下数据摄入、清洗、存储、查询、可视化的完整解决方案,在搜索、日志分析、统计分析等领域有广泛应用。

Elasticsearch 由多个节点组成一个分布式集群,一个节点被称为一个 Node。本文将基于 Elasticsearch v6.4.3 版本着重介绍 Node 的启动过程,也会简要概述 ES 内部的主要模块、线程池等。

Elasticsearch 启动过程

Elasticsearch 的启动流程主要涉及 Elasticsearch、Bootstrap 和 Node 三个类。主要包括加载三个步骤:

  • 加载本地环境:读取命令行参数和配置文件,生成本地环境配置
  • 创建 Node:创建节点实例,创建各种服务类对象,注入各种功能模块
  • 启动 Node:启动各种服务,加入集群

在详细解读这三个步骤前,这里先介绍下 Elasticsearch 的主程序入口。

主程序入口

从 elasticsearch 的启动脚本(bin/elastisearch)中,可以看到主程序的入口是 org.elasticsearch.bootstrap.Elasticsearch。

启动脚本

  exec
    "$JAVA"
    $ES_JAVA_OPTS \ 
    -Des.path.home="$ES_HOME"
    -Des.path.conf="$ES_PATH_CONF"
    -Des.distribution.flavor="$ES_DISTRIBUTION_FLAVOR"
    -Des.distribution.type="$ES_DISTRIBUTION_TYPE"
    -cp "$ES_CLASSPATH"
    org.elasticsearch.bootstrap.Elasticsearch \ 
    "$@"

主程序入口

PATH

org.elasticsearch.bootstrap.Elasticsearch#main

CODE

    /**
     * Main entry point for starting elasticsearch
     */
    public static void main(final String[] args) throws Exception {
        // 1. 创建安全管理器,授权所有操作
        System.setSecurityManager(new SecurityManager() {
            @Override
            public void checkPermission(Permission perm) {
                // grant all permissions so that we can later set the security manager to the one that we want
            }
        });
        // 2. 注册log侦听器
        LogConfigurator.registerErrorListener();
        // 3. 创建Elasticsearch类对象
        final Elasticsearch elasticsearch = new Elasticsearch();
        int status = main(args, elasticsearch, Terminal.DEFAULT);
        if (status != ExitCodes.OK) {
            exit(status);
        }
    }

    static int main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal) throws Exception {
        return elasticsearch.main(args, terminal);
    }

解析

入参 args 为命令行参数,该函数执行以下三个步骤:

  1. 设置安全管理器,授权所有操作:SecurityManager 在 Java 中被用来检查应用程序是否能访问一些有限的资源,例如文件、套接字(socket)等。这里的 checkPermission 函数授权了所有操作。
  2. 注册 log 侦听器:这里尽早启用日志侦听,防止有些日志无法被记录。
  3. 创建 Elasticsearch 类对象,如下图所示,Elasticsearch 的顺序继承至EnvironmentAwareCommand,Command。Elasticsearch() 会调用父类构造函数,注册命令行的解析规则,后续解析命令行参数时使用。

  1. 调用 elasticsearch.main() 来做进一步的初始化操作(实际是Command#main)。如果初始化报错,则退出进程。

第一步:加载本地环境:Elasticserach 初始化

PATH

elasticsearch\libs\cli\src\main\java\org\elasticsearch\cli\Command.java

CODE

   /** Parses options for this command from args and executes it. */
    public final int main(String[] args, Terminal terminal) throws Exception {
        if (addShutdownHook()) {

            shutdownHookThread = new Thread(() -> {
                try {
                    // Elasticsearch#close
                    this.close();
                } catch (final IOException e) {
                    try (
                        StringWriter sw = new StringWriter();
                        PrintWriter pw = new PrintWriter(sw)) {
                        // 异常关闭打印堆栈信息
                        e.printStackTrace(pw);
                        terminal.println(sw.toString());
                    } 
                }
            });
            // 1. 增加shutdown时的hook线程,在进程退出时调用
            Runtime.getRuntime().addShutdownHook(shutdownHookThread);
        }

        try {
            // 2. 解析命令行参数
            mainWithoutErrorHandling(args, terminal);
        } catch (OptionException e) {
            printHelp(terminal);
            terminal.println(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());
            return ExitCodes.USAGE;
        } 
        return ExitCodes.OK;
    }

解析

Command#main 的主要步骤两个:

  1. addShutdownHook:向 runtime 增加进程退出时的回调线程,在进程退出时调用 Elasticsearch#close,如果异常关闭则打印堆栈信息
  2. mainWithoutErrorHandling:解析部分命令行参数(-h,-v,-s)后,调用 EnvironmentAwareCommand#execute:
    protected void execute(Terminal terminal, OptionSet options) throws Exception {
        final Map<String, String> settings = new HashMap<>();
        putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data");
        putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home");
        putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs");
        execute(terminal, options, createEnv(terminal, settings));
    }
  • 获取 vm options 指定的参数,放入 settings 中,如下图所示。

  • 调用 createEnv,通过 prepareEnvironment 读取 es 的配置文件(conf/elasticsearch.yml),生成 Environment,存储一些路径及 ES 配置信息。
    public static Environment prepareEnvironment(Settings input, Terminal terminal, Map<String, String> properties, Path configPath) {
        Settings.Builder output = Settings.builder();
        Path path = environment.configFile().resolve("elasticsearch.yml");
        if (Files.exists(path)) {
            try {
                output.loadFromPath(path);
            } catch (IOException e) {
                throw new SettingsException("Failed to load settings from " + path.toString(), e);
            }
        }
        return new Environment(output.build(), configPath);
    }
    

  • 调用Elasticsearch#execute,读取daemonize/pidFile/quiet值,而后调用Elasticsearch#init -> Bootstrap.init,初始化Bootstrap。

    protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {

    final boolean daemonize = options.has(daemonizeOption);
    final Path pidFile = pidfileOption.value(options);
    final boolean quiet = options.has(quietOption);
    try {
        init(daemonize, pidFile, quiet, env);
    } catch (NodeValidationException e) {
        throw new UserException(ExitCodes.CONFIG, e.getMessage());
    }

    }

    ### Bootstrap 初始化
    

PATH
````
elasticsearch\bootstrap\Bootstrap.java

**CODE**
/**
 * This method is invoked by {@link Elasticsearch#main(String[])} to startup elasticsearch.
 */
static void init(
        final boolean foreground,
        final Path pidFile,
        final boolean quiet,
        final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {

    // 1. 创建 Bootstrap 类对象, 启动keepAlive线程
    INSTANCE = new Bootstrap();

    // 2. 加载安全、日志配置信息,创建pidFile
    final SecureSettings keystore = loadSecureSettings(initialEnv);
    try {
        LogConfigurator.configure(environment);
    } catch (IOException e) {
        throw new BootstrapException(e);
    }
    if (environment.pidFile() != null) {
        try {
            PidFile.create(environment.pidFile(), true);
        } catch (IOException e) {
            throw new BootstrapException(e);
        }
    }
    try {
        // 检测Lucene Jar版本
        checkLucene();
        // 3. 创建Node
        INSTANCE.setup(true, environment);
        // 4. 启动Node
        INSTANCE.start();

    } catch (NodeValidationException | RuntimeException e) {

    }
}
**解析**

Bootstrap#init 顺序执行以下步骤:

1.  创建 Bootstrap 类对象,创建 keepAliveThread,等待 keepAliveLatch降为 0 时,该线程退出。同时向 runtime 添加一个 ShutdownHook,当进程退出时,keepAliveLatch 降为 0,keepAliveThread 退出。`The Java Virtual Machine exits when the only threads running are all daemon threads.` 当唯一的非 Deamon 线程,keepAliveThread 退出时,JVM 关闭。
/** creates a new instance */
Bootstrap() {
    keepAliveThread = new Thread(new Runnable() {
        public void run() {
            try {
                // 等待进程退出,等待keepAliveLatch降为0时,退出当前线程。
                keepAliveLatch.await();
            } 
        }
    }, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
    keepAliveThread.setDaemon(false);
    // keep this thread alive (non daemon thread) until we shutdown
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            // 进程退出时,keepAliveLatch降为0,keepAliveThread退出。
            keepAliveLatch.countDown();
        }
    });
}
  1. 设定安全、日志配置信息,创建pidFile。pidFile为es的进程ID,防止多个ES进程读写同一路径。

  2. 创建Node:ES的一个节点被封装为一个Node实例,由Node调用ES的各个模块,完成集群管理、写入、查询等功能。

    private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {

    try {
        // 遍历modules目录,读取各模块信息,为其生成控制类,这些控制类将通过stdin, stdout 和 stderr 与JVM保持连接
        spawner.spawnNativeControllers(environment);
    } catch (IOException e) {
        throw new BootstrapException(e);
    }
    // 本地环境的检测、设置(user/thread/VirtualMemory/fileSize等)
    initializeNatives(
            environment.tmpFile(),
            BootstrapSettings.MEMORY_LOCK_SETTING.get(settings),
            BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings),
            BootstrapSettings.CTRLHANDLER_SETTING.get(settings));
    
    // 创建Node节点,后节详述
    node = new Node(environment) {
        @Override
        protected void validateNodeBeforeAcceptingRequests(
            final BootstrapContext context,
            final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {
            BootstrapChecks.check(context, boundTransportAddress, checks);
        }
    };

    }
    ```

  3. 启动 Node

    private void start() throws NodeValidationException {
        // 启动Node,后节详述
        node.start();
        // 启动前台keepAliveThread线程,等待进程关闭时,关闭JVM
        keepAliveThread.start();
    }

    第二步:创建 Node

PATH

elasticsearch\node\Node.java

CODE

代码较长,做了大量精简:

    protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {

        try {
            // 1. 创建节点环境,包括nodeId/nodePaths/logger等;创建tmpSettings,主要是一些节点配置信息
            // create the node environment as soon as possible, to recover the node id and enable logging
            try {
                nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
            } catch (IOException ex) {
                throw new IllegalStateException("Failed to create node environment", ex);
            }
            final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings);
            final String nodeId = nodeEnvironment.nodeId();
            tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId);
            final Logger logger = Loggers.getLogger(Node.class, tmpSettings);
            // this must be captured after the node name is possibly added to the settings
            final String nodeName = NODE_NAME_SETTING.get(tmpSettings);
            if (hadPredefinedNodeName == false) {
                logger.info("node name derived from node ID [{}]; set [{}] to override", nodeId, NODE_NAME_SETTING.getKey());
            } else {
                logger.info("node name [{}], node ID [{}]", nodeName, nodeId);
            }
            // 2. 打印 jvm 信息
            final JvmInfo jvmInfo = JvmInfo.jvmInfo();
            logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));

            // 3. 创建PluginsService,加载modules目录下的所有模块和plugins目录下的所有插件
            this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(), environment.pluginsFile(), classpathPlugins);

            // 4. 创建Node.environment
             this.environment = new Environment(this.settings, environment.configFile());

            // 5. 调用各插件的getExecutorBuilders,获取ExecutorBuilder/thread pool
            final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);

            // 6. 创建线程池
            final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));

            // 7. 创建NodeClient
            client = new NodeClient(settings, threadPool);

            // 8. 创建各种服务类对象***Service和各种模块对象***Module
            final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
            final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
            AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
            ...
            final PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, client);

            // 绑定各种服务模块的实例
            modules.add(b -> {
                    b.bind(Node.class).toInstance(this);
                    b.bind(NodeService.class).toInstance(nodeService);
                    ...
                    b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
                    b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);
                }
            );
            injector = modules.createInjector();

            // 9. 初始化rest handler,用于后续接收 http rest 请求
            if (NetworkModule.HTTP_ENABLED.get(settings)) {
                logger.debug("initializing HTTP handlers ...");
                actionModule.initRestHandlers(() -> clusterService.state().nodes());
            }

            // node初始化完成
            logger.info("initialized");
            success = true;
        } catch (IOException ex) {
            throw new ElasticsearchException("failed to bind service", ex);
        } finally {
            if (!success) {
                IOUtils.closeWhileHandlingException(resourcesToClose);
            }
        }
    }

解析

  1. 创建节点环境,包括 nodeId/nodePaths/logger 等;创建tmpSettings,主要是一些节点配置信息。lock data目录。

  1. 打印 JVM 信息
  2. 创建 PluginsService,加载 classpath、modules 目录和 plugins 目录下的所有模块

    public PluginsService(Settings settings, Path configPath, Path modulesDirectory, Path pluginsDirectory, Collection<Class<? extends Plugin>> classpathPlugins) {

        List<Tuple<PluginInfo, Plugin>> pluginsLoaded = new ArrayList<>();
        List<PluginInfo> pluginsList = new ArrayList<>();
        final List<String> pluginsNames = new ArrayList<>();

        // 加载 classpath 中的plugins, 供 tests 和 transport clients 使用
        for (Class<? extends Plugin> pluginClass : classpathPlugins) {
            pluginsLoaded.add(new Tuple<>(pluginInfo, plugin));
            pluginsList.add(pluginInfo);
        }

        Set<Bundle> seenBundles = new LinkedHashSet<>();
        List<PluginInfo> modulesList = new ArrayList<>();

        // 加载 modules
        if (modulesDirectory != null) {
            Set<Bundle> modules = getModuleBundles(modulesDirectory);
            for (Bundle bundle : modules) {
                ...
            }
            seenBundles.addAll(modules);
        }

        // 加载 plugins/ 目录下的 plugins
        if (pluginsDirectory != null) {
            Set<Bundle> plugins = getPluginBundles(pluginsDirectory);
            for (final Bundle bundle : plugins) {
                pluginsList.add(bundle.plugin);
            }
            seenBundles.addAll(plugins);
        }

        // 前面装载的每个module和plugin都是一个bundle, a "bundle" is a group of jars in a single classloader
        // 因此这里可以将modules和plugins统一封装为Plugin
        List<Tuple<PluginInfo, Plugin>> loaded = loadBundles(seenBundles);
        pluginsLoaded.addAll(loaded);

        // 将plugins和modules的元信息保存至PluginsAndModules info
        this.info = new PluginsAndModules(pluginsList, modulesList);

        // 将Plugin放入List<Tuple<PluginInfo, Plugin>> plugins
        this.plugins = Collections.unmodifiableList(pluginsLoaded);

    }
  1. 创建 Node.environment

  2. 调用各插件的 getExecutorBuilders,获取 ExecutorBuilder

  3. 创建 ThreadPool

    // ThreadPool构造函数:
    public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {
    
        final Map<String, ExecutorBuilder> builders = new HashMap<>();
        // 获取本机cpu核数,假设cpu核数为8
        final int availableProcessors = EsExecutors.numberOfProcessors(settings);
        // (cpu+1)/2 在区间 [1,5] 中的取值,此处(8+1)/2 = 4, 在[1,5]区间内取值为4
        final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);
        // (cpu+1)/2 在区间 [1,10] 中的取值,此处(8+1)/2 = 4, 在[1,5]区间内取值为4
        final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
        // 4*8=32,genericThreadPoolMax在区间[128,512]中的取值为128
        final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
    
        // 创建各种线程池的builder
        builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
        builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200, true));
        builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, "bulk", availableProcessors, 200));
        ...
    
        threadContext = new ThreadContext(settings);
    
        // 创建各种线程池
        final Map<String, ExecutorHolder> executors = new HashMap<>();
        for (@SuppressWarnings("unchecked") final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {
            final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);
            executors.put(entry.getKey(), executorHolder);
        }
        executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));
        this.executors = unmodifiableMap(executors);
    }

    线程池类型:

  • DIRECT:即elasticsearch.common.util.concurrent.EsExecutors#DIRECT_EXECUTOR_SERVICE。通过调用方的当前线程,执行一个Runnable.run()过程,运行过程中该线程不允许被关闭。an {@link ExecutorService} that executes submitted tasks on the current thread. This executor service does not support being shutdown.
  • FIXED: 线程数量固定,有队列,队列长度为固定值。无空闲线程时,请求被放入队列中。参数:
    size      the fixed number of threads
    queueSize the size of the backing queue, -1 for unbounded
  • SCALING: 线程数量不固定,在core和max之间动态变化。参数:
    core      the minimum number of threads in the pool
    max       the maximum number of threads in the pool
    keepAlive the time that spare threads above {@code core} threads will be kept alive
  • FIXED_AUTO_QUEUE_SIZE: 线程数量固定,有队列,队列长度为不固定。参数:
    size             the fixed number of threads
    initialQueueSize initial size of the backing queue
    minQueueSize     the minimum size of the backing queue
    maxQueueSize     the maximum size of the backing queue
    ES中的线程池:
线程池名称 类型 介绍 参数默认值
SAME DIRECT 通过当前线程直接执行某些逻辑
LISTENER FIXED 用于java client得到响应时执行某些逻辑 size:min((availableProcessors + 1) / 2, 10),queueSize:10
GET FIXED 用于get请求 size:availableProcessors, queueSize:200
ANALYZE FIXED 用于analyze(分词)操作 size:1, queueSize:16
WRITE FIXED 用于put/bulk请求 size:availableProcessors, queueSize:200
FORCE_MERGE FIXED 用于segment force-merge操作 size:1, queueSize:-1
GENERIC SCALING 通用的线程,如NodeDiscovery等 core:4,max:availableProcessors*4 在区间[128,512]中的取值,keepAlive:30s
MANAGEMENT SCALING 用于集群管理等 core:1,max:5,keepAlive:5m
FLUSH SCALING 用于flush操作 core:1,max:min((availableProcessors + 1) / 2, 5),keepAlive:5m
REFRESH SCALING 用于refresh操作 core:1,max:min((availableProcessors + 1) / 2, 10),keepAlive:5m
WARMER SCALING 用于segment warm-up操作 core:1,max:min((availableProcessors + 1) / 2, 5),keepAlive:5m
SNAPSHOT SCALING 用于snapshot操作 core:1,max:min((availableProcessors + 1) / 2, 5),keepAlive:5m
FETCH_SHARD_STARTED SCALING 用于fetch shard开始操作 core:1,max:availableProcessors * 2,keepAlive:5m
FETCH_SHARD_STORE SCALING 用于fetch shard存储操作 core:1,max:availableProcessors * 2,keepAlive:5m
SEARCH FIXED_AUTO_QUEUE_SIZE 用于search/count/suggest请求 size:((availableProcessors * 3) / 2) + 1,initialQueueSize:1000,minQueueSize:1000,maxQueueSize:1000
  1. 创建客户端NodeClient
  2. 创建各种服务类对象xxxService,利用Guice注册各种服务使用的模块xxxModule
  • Service:
服务 简介
ResourceWatcherService 监控统计各种服务使用的资源
NetworkService TCP/IP/PORT等网络配置管理
ClusterService 集群管理,集群状态发布、更新等
IngestService Ingest Node的写入数据预处理服务
ClusterInfoService 用于获取最新的集群信息
UsageService 监视Elasticsearch各种功能的使用情况
MonitorService JVM、进程、系统的监控服务
CircuitBreakerService 熔断服务,资源使用超限时阻止任务执行
MetaStateService 读写Metadata和IndexMetadata
IndicesService 索引管理,索引的创建、删除等操作
IndicesClusterStateService 集群状态更新时,处理索引相关的操作
MetaDataIndexUpgradeService 更新IndexMetadata至最新版本
TemplateUpgradeService 节点加入集群时,升级其plugins相关的template
TransportService Transport层网络服务
HttpServerTransport Http层网络服务,提供REST接口服务
ResponseCollectorService 收集每个节点上执行任务的队列大小,响应时间和服务时间的统计信息
NodeService 一个节点的实例,负责调用各种服务
SearchService 处理查询任务
SnapshotsService 快照服务
Discovery 集群发现服务
RoutingService 路由表管理
GatewayService 集群、索引的元数据的持久化及恢复
  • Module:ScriptModule、AnalysisModule、SettingsModule、PluginModule、ClusterModule、IndicesModule、SearchModule、GatewayModule、RepositoriesModule、ActionModule、NetworkModule、DiscoveryModule。各模块功能可以参照上面的同名Service。

第三步:启动Node

PATH

elasticsearch\node\Node.java

CODE

    /**
     * Start the node. If the node is already started, this method is no-op.
     */
    public Node start() throws NodeValidationException {
        // 1. 状态机,将local node的state设为STARTED状态
        if (!lifecycle.moveToStarted()) {
            return this;
        }

        logger.info("starting ...");

        // LifecycleComponent in modules and plugins start
        pluginLifecycleComponents.forEach(LifecycleComponent::start);

        // 2. 获取创建Node时各种模块及服务绑定的实例,启动这些实例
        // AbstractLifecycleComponent.start() -> class.doStart()

        injector.getInstance(MappingUpdatedAction.class).setClient(client);
        injector.getInstance(IndicesService.class).start();
        injector.getInstance(IndicesClusterStateService.class).start();
        injector.getInstance(SnapshotsService.class).start();
        injector.getInstance(SnapshotShardsService.class).start();
        injector.getInstance(RoutingService.class).start();
        injector.getInstance(SearchService.class).start();
        nodeService.getMonitorService().start();
        ...
        Discovery discovery = injector.getInstance(Discovery.class);
        clusterService.getMasterService().setClusterStatePublisher(discovery::publish);

        // 启动 transport service
        TransportService transportService = injector.getInstance(TransportService.class);
        transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
        transportService.start();

        // 加载本地的MeteData信息
        final MetaData onDiskMetadata;
        try {
            if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) {
                onDiskMetadata = injector.getInstance(GatewayMetaState.class).loadMetaState();
            } else {
                onDiskMetadata = MetaData.EMPTY_META_DATA;
            }

        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }

        // bootstrap的各项检测:BootstrapChecks.check
        validateNodeBeforeAcceptingRequests(new BootstrapContext(settings, onDiskMetadata), transportService.boundAddress(), pluginsService
            .filterPlugins(Plugin
            .class)
            .stream()
            .flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));

        // 初始化ClusterState,启动Discovery
        discovery.start(); 
        // 启动clusterServeice、clusterApplierService、masterService
        clusterService.start();

        // transport层启动,开始接受请求
        transportService.acceptIncomingRequests();
        // initial discovery -> ZenDiscovery.java innerJoinCluster(),加入集群
        discovery.startInitialJoin();

        // Http层启动,开始接受请求
        injector.getInstance(HttpServerTransport.class).start();

        // 节点启动成功
        logger.info("started");

        pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);

        return this;
    }

解析

启动 node 时,主要是获取各个服务模块绑定的实例,调用每个实例的 start() 方法(实际是 class.doStart())来启动各项服务。这其中比较重要的几个过程有:

  1. 启动 transport service,使得后续该节点可通过discovery过程加入集群
  2. 如果该节点node.master属性为true的话,加载本地的metadata,以获取原集群的信息(节点挂掉后重启的场景)
  3. bootstrap check,检测ES当前的运行环境,主要是操作系统和JVM参数,如下图所示。某些检测不通过则ES会报错退出。各项检测的具体含义可以参考官方文档 Bootstrap Checks

  1. 启动discovery和clusterService,初始化集群元信息ClusterState
  2. 启动transport服务,用于节点间通信
  3. 启动initial discovery,加入所属的Elasticsearch集群
  4. 启动http服务,开始接受用户请求

启动日志

最后我们通过ES节点的日志来验证下上面讲述的节点启动流程

# 开始创建Node
[2018-12-28T19:54:44,159][INFO ][o.e.n.Node               ] [] initializing ...
# 读取本地目录信息、JVM信息,创建NodeEnvironment
[2018-12-28T19:54:44,376][INFO ][o.e.e.NodeEnvironment    ] [V9VXhfr] using [1] data paths, mounts [[项目 (D:)]], net usable_space [271.7gb], net total_space [310gb], types [NTFS]
[2018-12-28T19:54:44,376][INFO ][o.e.e.NodeEnvironment    ] [V9VXhfr] heap size [1gb], compressed ordinary object pointers [true]
# 打印NodeName等节点信息
[2018-12-28T19:54:44,379][INFO ][o.e.n.Node               ] [V9VXhfr] node name derived from node ID [V9VXhfr9TvSyUfxyr-ZQWg]; set [node.name] to override
[2018-12-28T19:54:44,379][INFO ][o.e.n.Node               ] [V9VXhfr] version[6.4.3-SNAPSHOT], pid[19512], build[unknown/unknown/Unknown/Unknown], OS[Windows 7/6.1/amd64], JVM["Oracle Corporation"/Java HotSpot(TM) 64-Bit Server VM/10.0.1/10.0.1+10]
# 打印JVM信息
[2018-12-28T19:54:44,379][INFO ][o.e.n.Node               ] [V9VXhfr] JVM arguments [-agentlib:jdwp=transport=dt_socket,address=127.0.0.1:8831,suspend=y,server=n, -Des.path.home=D:\elasticsearch_release\elasticsearch-6.4.3, -Des.path.conf=D:\elasticsearch_release\elasticsearch-6.4.3\config, -Djava.security.policy=D:\elasticsearch_release\elasticsearch-6.4.3\config\java.policy, -Dlog4j2.disable.jmx=true, -Xms1g, -Xmx1g, -javaagent:C:\Users\morningchen\.IntelliJIdea2018.3\system\groovyHotSwap\gragent.jar, -javaagent:C:\Users\morningchen\.IntelliJIdea2018.3\system\captureAgent\debugger-agent.jar, -Dfile.encoding=UTF-8]
# 初始化各种Service和Module
# 加载各种module
[2018-12-28T19:54:53,359][INFO ][o.e.p.PluginsService     ] [V9VXhfr] loaded module [aggs-matrix-stats]
[2018-12-28T19:54:53,359][INFO ][o.e.p.PluginsService     ] [V9VXhfr] loaded module [analysis-common]
... 
[2018-12-28T19:54:53,362][INFO ][o.e.p.PluginsService     ] [V9VXhfr] loaded module [x-pack-watcher]
# 加载plugin
[2018-12-28T19:54:53,362][INFO ][o.e.p.PluginsService     ] [V9VXhfr] no plugins loaded
[2018-12-28T19:54:58,078][DEBUG][o.e.a.ActionModule       ] Using REST wrapper from plugin org.elasticsearch.xpack.security.Security
[2018-12-28T19:54:58,271][INFO ][o.e.d.DiscoveryModule    ] [V9VXhfr] using discovery type [zen]
[2018-12-28T19:54:59,102][INFO ][o.e.n.Node               ] [V9VXhfr] initialized
# 开始启动Node
[2018-12-28T19:54:59,102][INFO ][o.e.n.Node               ] [V9VXhfr] starting ...
# 启动Transport服务
[2018-12-28T19:54:59,428][INFO ][o.e.t.TransportService   ] [V9VXhfr] publish_address {10.40.98.48:9300}, bound_addresses {127.0.0.1:9300}, {[::1]:9300}
# BootstrapChecks
[2018-12-28T19:54:59,442][INFO ][o.e.b.BootstrapChecks    ] [V9VXhfr] bound or publishing to a non-loopback address, enforcing bootstrap checks
# 加入集群
[2018-12-28T19:55:54,939][INFO ][o.e.c.s.MasterService    ] [V9VXhfr] zen-disco-elected-as-master ([0] nodes joined)[, ], reason: new_master {V9VXhfr}{V9VXhfr9TvSyUfxyr-ZQWg}{m7N1OuBHRSuJlyGOoIJmqw}{10.40.98.48}{10.40.98.48:9300}{ml.machine_memory=17099067392, xpack.installed=true, ml.max_open_jobs=20, ml.enabled=true}
[2018-12-28T19:55:54,945][INFO ][o.e.c.s.ClusterApplierService] [V9VXhfr] new_master {V9VXhfr}{V9VXhfr9TvSyUfxyr-ZQWg}{m7N1OuBHRSuJlyGOoIJmqw}{10.40.98.48}{10.40.98.48:9300}{ml.machine_memory=17099067392, xpack.installed=true, ml.max_open_jobs=20, ml.enabled=true}, reason: apply cluster state (from master [master {V9VXhfr}{V9VXhfr9TvSyUfxyr-ZQWg}{m7N1OuBHRSuJlyGOoIJmqw}{10.40.98.48}{10.40.98.48:9300}{ml.machine_memory=17099067392, xpack.installed=true, ml.max_open_jobs=20, ml.enabled=true} committed version [1] source [zen-disco-elected-as-master ([0] nodes joined)[, ]]])
# 启动Http服务
[2018-12-28T19:55:58,722][INFO ][o.e.x.s.t.n.SecurityNetty4HttpServerTransport] [V9VXhfr] publish_address {10.40.98.48:9200}, bound_addresses {127.0.0.1:9200}, {[::1]:9200}
# 节点启动完毕
[2018-12-28T19:55:58,723][INFO ][o.e.n.Node               ] [V9VXhfr] started
# license检测
[2018-12-28T19:55:58,902][INFO ][o.e.l.LicenseService     ] [V9VXhfr] license [a4e08819-7a8b-4017-8b85-5329bc2909b0] mode [basic] - valid
# 开始恢复本地数据
[2018-12-28T19:55:58,926][INFO ][o.e.g.GatewayService     ] [V9VXhfr] recovered [0] indices into cluster_state

作者:morningchen 腾讯工程师
来源:腾讯云
原文链接:cloud.tencent.com/developer/articl...

讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!