[转载] Elasitcsearch 底层系列之 Node 启动过程源码解析
Elasticsearch Node 启动过程源码解析
Elasticsearch 简介
Elasticsearch 是一款开源的分布式搜索引擎,提供了近实时的查询能力和强大的聚合分析能力。与Elastic官方提供的其他组件(Beats、Logstash、Kibana)组合成 Elastic Stack,提供了多种使用场景下数据摄入、清洗、存储、查询、可视化的完整解决方案,在搜索、日志分析、统计分析等领域有广泛应用。
Elasticsearch 由多个节点组成一个分布式集群,一个节点被称为一个 Node。本文将基于 Elasticsearch v6.4.3 版本着重介绍 Node 的启动过程,也会简要概述 ES 内部的主要模块、线程池等。
Elasticsearch 启动过程
Elasticsearch 的启动流程主要涉及 Elasticsearch、Bootstrap 和 Node 三个类。主要包括加载三个步骤:
- 加载本地环境:读取命令行参数和配置文件,生成本地环境配置
- 创建 Node:创建节点实例,创建各种服务类对象,注入各种功能模块
- 启动 Node:启动各种服务,加入集群
在详细解读这三个步骤前,这里先介绍下 Elasticsearch 的主程序入口。
主程序入口
从 elasticsearch 的启动脚本(bin/elastisearch)中,可以看到主程序的入口是 org.elasticsearch.bootstrap.Elasticsearch。
启动脚本
exec
"$JAVA"
$ES_JAVA_OPTS \
-Des.path.home="$ES_HOME"
-Des.path.conf="$ES_PATH_CONF"
-Des.distribution.flavor="$ES_DISTRIBUTION_FLAVOR"
-Des.distribution.type="$ES_DISTRIBUTION_TYPE"
-cp "$ES_CLASSPATH"
org.elasticsearch.bootstrap.Elasticsearch \
"$@"
主程序入口
PATH
org.elasticsearch.bootstrap.Elasticsearch#main
CODE
/**
* Main entry point for starting elasticsearch
*/
public static void main(final String[] args) throws Exception {
// 1. 创建安全管理器,授权所有操作
System.setSecurityManager(new SecurityManager() {
@Override
public void checkPermission(Permission perm) {
// grant all permissions so that we can later set the security manager to the one that we want
}
});
// 2. 注册log侦听器
LogConfigurator.registerErrorListener();
// 3. 创建Elasticsearch类对象
final Elasticsearch elasticsearch = new Elasticsearch();
int status = main(args, elasticsearch, Terminal.DEFAULT);
if (status != ExitCodes.OK) {
exit(status);
}
}
static int main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal) throws Exception {
return elasticsearch.main(args, terminal);
}
解析
入参 args 为命令行参数,该函数执行以下三个步骤:
- 设置安全管理器,授权所有操作:SecurityManager 在 Java 中被用来检查应用程序是否能访问一些有限的资源,例如文件、套接字(socket)等。这里的 checkPermission 函数授权了所有操作。
- 注册 log 侦听器:这里尽早启用日志侦听,防止有些日志无法被记录。
- 创建 Elasticsearch 类对象,如下图所示,Elasticsearch 的顺序继承至EnvironmentAwareCommand,Command。Elasticsearch() 会调用父类构造函数,注册命令行的解析规则,后续解析命令行参数时使用。
- 调用 elasticsearch.main() 来做进一步的初始化操作(实际是Command#main)。如果初始化报错,则退出进程。
第一步:加载本地环境:Elasticserach 初始化
PATH
elasticsearch\libs\cli\src\main\java\org\elasticsearch\cli\Command.java
CODE
/** Parses options for this command from args and executes it. */
public final int main(String[] args, Terminal terminal) throws Exception {
if (addShutdownHook()) {
shutdownHookThread = new Thread(() -> {
try {
// Elasticsearch#close
this.close();
} catch (final IOException e) {
try (
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw)) {
// 异常关闭打印堆栈信息
e.printStackTrace(pw);
terminal.println(sw.toString());
}
}
});
// 1. 增加shutdown时的hook线程,在进程退出时调用
Runtime.getRuntime().addShutdownHook(shutdownHookThread);
}
try {
// 2. 解析命令行参数
mainWithoutErrorHandling(args, terminal);
} catch (OptionException e) {
printHelp(terminal);
terminal.println(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());
return ExitCodes.USAGE;
}
return ExitCodes.OK;
}
解析
Command#main 的主要步骤两个:
- addShutdownHook:向 runtime 增加进程退出时的回调线程,在进程退出时调用 Elasticsearch#close,如果异常关闭则打印堆栈信息
- mainWithoutErrorHandling:解析部分命令行参数(-h,-v,-s)后,调用 EnvironmentAwareCommand#execute:
protected void execute(Terminal terminal, OptionSet options) throws Exception { final Map<String, String> settings = new HashMap<>(); putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data"); putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home"); putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs"); execute(terminal, options, createEnv(terminal, settings)); }
- 获取 vm options 指定的参数,放入 settings 中,如下图所示。
- 调用 createEnv,通过 prepareEnvironment 读取 es 的配置文件(conf/elasticsearch.yml),生成 Environment,存储一些路径及 ES 配置信息。
public static Environment prepareEnvironment(Settings input, Terminal terminal, Map<String, String> properties, Path configPath) { Settings.Builder output = Settings.builder(); Path path = environment.configFile().resolve("elasticsearch.yml"); if (Files.exists(path)) { try { output.loadFromPath(path); } catch (IOException e) { throw new SettingsException("Failed to load settings from " + path.toString(), e); } } return new Environment(output.build(), configPath); }
调用Elasticsearch#execute,读取daemonize/pidFile/quiet值,而后调用Elasticsearch#init -> Bootstrap.init,初始化Bootstrap。
protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {
final boolean daemonize = options.has(daemonizeOption); final Path pidFile = pidfileOption.value(options); final boolean quiet = options.has(quietOption); try { init(daemonize, pidFile, quiet, env); } catch (NodeValidationException e) { throw new UserException(ExitCodes.CONFIG, e.getMessage()); }
}
### Bootstrap 初始化
PATH
````
elasticsearch\bootstrap\Bootstrap.java
**CODE**
/**
* This method is invoked by {@link Elasticsearch#main(String[])} to startup elasticsearch.
*/
static void init(
final boolean foreground,
final Path pidFile,
final boolean quiet,
final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {
// 1. 创建 Bootstrap 类对象, 启动keepAlive线程
INSTANCE = new Bootstrap();
// 2. 加载安全、日志配置信息,创建pidFile
final SecureSettings keystore = loadSecureSettings(initialEnv);
try {
LogConfigurator.configure(environment);
} catch (IOException e) {
throw new BootstrapException(e);
}
if (environment.pidFile() != null) {
try {
PidFile.create(environment.pidFile(), true);
} catch (IOException e) {
throw new BootstrapException(e);
}
}
try {
// 检测Lucene Jar版本
checkLucene();
// 3. 创建Node
INSTANCE.setup(true, environment);
// 4. 启动Node
INSTANCE.start();
} catch (NodeValidationException | RuntimeException e) {
}
}
**解析**
Bootstrap#init 顺序执行以下步骤:
1. 创建 Bootstrap 类对象,创建 keepAliveThread,等待 keepAliveLatch降为 0 时,该线程退出。同时向 runtime 添加一个 ShutdownHook,当进程退出时,keepAliveLatch 降为 0,keepAliveThread 退出。`The Java Virtual Machine exits when the only threads running are all daemon threads.` 当唯一的非 Deamon 线程,keepAliveThread 退出时,JVM 关闭。
/** creates a new instance */
Bootstrap() {
keepAliveThread = new Thread(new Runnable() {
public void run() {
try {
// 等待进程退出,等待keepAliveLatch降为0时,退出当前线程。
keepAliveLatch.await();
}
}
}, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
keepAliveThread.setDaemon(false);
// keep this thread alive (non daemon thread) until we shutdown
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
// 进程退出时,keepAliveLatch降为0,keepAliveThread退出。
keepAliveLatch.countDown();
}
});
}
设定安全、日志配置信息,创建pidFile。pidFile为es的进程ID,防止多个ES进程读写同一路径。
创建Node:ES的一个节点被封装为一个Node实例,由Node调用ES的各个模块,完成集群管理、写入、查询等功能。
private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {
try { // 遍历modules目录,读取各模块信息,为其生成控制类,这些控制类将通过stdin, stdout 和 stderr 与JVM保持连接 spawner.spawnNativeControllers(environment); } catch (IOException e) { throw new BootstrapException(e); } // 本地环境的检测、设置(user/thread/VirtualMemory/fileSize等) initializeNatives( environment.tmpFile(), BootstrapSettings.MEMORY_LOCK_SETTING.get(settings), BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings), BootstrapSettings.CTRLHANDLER_SETTING.get(settings)); // 创建Node节点,后节详述 node = new Node(environment) { @Override protected void validateNodeBeforeAcceptingRequests( final BootstrapContext context, final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException { BootstrapChecks.check(context, boundTransportAddress, checks); } };
}
```启动 Node
private void start() throws NodeValidationException { // 启动Node,后节详述 node.start(); // 启动前台keepAliveThread线程,等待进程关闭时,关闭JVM keepAliveThread.start(); }
第二步:创建 Node
PATH
elasticsearch\node\Node.java
CODE
代码较长,做了大量精简:
protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
try {
// 1. 创建节点环境,包括nodeId/nodePaths/logger等;创建tmpSettings,主要是一些节点配置信息
// create the node environment as soon as possible, to recover the node id and enable logging
try {
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
} catch (IOException ex) {
throw new IllegalStateException("Failed to create node environment", ex);
}
final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings);
final String nodeId = nodeEnvironment.nodeId();
tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId);
final Logger logger = Loggers.getLogger(Node.class, tmpSettings);
// this must be captured after the node name is possibly added to the settings
final String nodeName = NODE_NAME_SETTING.get(tmpSettings);
if (hadPredefinedNodeName == false) {
logger.info("node name derived from node ID [{}]; set [{}] to override", nodeId, NODE_NAME_SETTING.getKey());
} else {
logger.info("node name [{}], node ID [{}]", nodeName, nodeId);
}
// 2. 打印 jvm 信息
final JvmInfo jvmInfo = JvmInfo.jvmInfo();
logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));
// 3. 创建PluginsService,加载modules目录下的所有模块和plugins目录下的所有插件
this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(), environment.pluginsFile(), classpathPlugins);
// 4. 创建Node.environment
this.environment = new Environment(this.settings, environment.configFile());
// 5. 调用各插件的getExecutorBuilders,获取ExecutorBuilder/thread pool
final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);
// 6. 创建线程池
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
// 7. 创建NodeClient
client = new NodeClient(settings, threadPool);
// 8. 创建各种服务类对象***Service和各种模块对象***Module
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
...
final PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, client);
// 绑定各种服务模块的实例
modules.add(b -> {
b.bind(Node.class).toInstance(this);
b.bind(NodeService.class).toInstance(nodeService);
...
b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);
}
);
injector = modules.createInjector();
// 9. 初始化rest handler,用于后续接收 http rest 请求
if (NetworkModule.HTTP_ENABLED.get(settings)) {
logger.debug("initializing HTTP handlers ...");
actionModule.initRestHandlers(() -> clusterService.state().nodes());
}
// node初始化完成
logger.info("initialized");
success = true;
} catch (IOException ex) {
throw new ElasticsearchException("failed to bind service", ex);
} finally {
if (!success) {
IOUtils.closeWhileHandlingException(resourcesToClose);
}
}
}
解析
- 创建节点环境,包括 nodeId/nodePaths/logger 等;创建tmpSettings,主要是一些节点配置信息。lock data目录。
- 打印 JVM 信息
- 创建 PluginsService,加载 classpath、modules 目录和 plugins 目录下的所有模块
public PluginsService(Settings settings, Path configPath, Path modulesDirectory, Path pluginsDirectory, Collection<Class<? extends Plugin>> classpathPlugins) {
List<Tuple<PluginInfo, Plugin>> pluginsLoaded = new ArrayList<>();
List<PluginInfo> pluginsList = new ArrayList<>();
final List<String> pluginsNames = new ArrayList<>();
// 加载 classpath 中的plugins, 供 tests 和 transport clients 使用
for (Class<? extends Plugin> pluginClass : classpathPlugins) {
pluginsLoaded.add(new Tuple<>(pluginInfo, plugin));
pluginsList.add(pluginInfo);
}
Set<Bundle> seenBundles = new LinkedHashSet<>();
List<PluginInfo> modulesList = new ArrayList<>();
// 加载 modules
if (modulesDirectory != null) {
Set<Bundle> modules = getModuleBundles(modulesDirectory);
for (Bundle bundle : modules) {
...
}
seenBundles.addAll(modules);
}
// 加载 plugins/ 目录下的 plugins
if (pluginsDirectory != null) {
Set<Bundle> plugins = getPluginBundles(pluginsDirectory);
for (final Bundle bundle : plugins) {
pluginsList.add(bundle.plugin);
}
seenBundles.addAll(plugins);
}
// 前面装载的每个module和plugin都是一个bundle, a "bundle" is a group of jars in a single classloader
// 因此这里可以将modules和plugins统一封装为Plugin
List<Tuple<PluginInfo, Plugin>> loaded = loadBundles(seenBundles);
pluginsLoaded.addAll(loaded);
// 将plugins和modules的元信息保存至PluginsAndModules info
this.info = new PluginsAndModules(pluginsList, modulesList);
// 将Plugin放入List<Tuple<PluginInfo, Plugin>> plugins
this.plugins = Collections.unmodifiableList(pluginsLoaded);
}
创建 Node.environment
调用各插件的 getExecutorBuilders,获取 ExecutorBuilder
创建 ThreadPool
// ThreadPool构造函数: public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) { final Map<String, ExecutorBuilder> builders = new HashMap<>(); // 获取本机cpu核数,假设cpu核数为8 final int availableProcessors = EsExecutors.numberOfProcessors(settings); // (cpu+1)/2 在区间 [1,5] 中的取值,此处(8+1)/2 = 4, 在[1,5]区间内取值为4 final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors); // (cpu+1)/2 在区间 [1,10] 中的取值,此处(8+1)/2 = 4, 在[1,5]区间内取值为4 final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors); // 4*8=32,genericThreadPoolMax在区间[128,512]中的取值为128 final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512); // 创建各种线程池的builder builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30))); builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200, true)); builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, "bulk", availableProcessors, 200)); ... threadContext = new ThreadContext(settings); // 创建各种线程池 final Map<String, ExecutorHolder> executors = new HashMap<>(); for (@SuppressWarnings("unchecked") final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) { final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext); executors.put(entry.getKey(), executorHolder); } executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT))); this.executors = unmodifiableMap(executors); }
线程池类型:
- DIRECT:即
elasticsearch.common.util.concurrent.EsExecutors#DIRECT_EXECUTOR_SERVICE
。通过调用方的当前线程,执行一个Runnable.run()过程,运行过程中该线程不允许被关闭。an {@link ExecutorService} that executes submitted tasks on the current thread. This executor service does not support being shutdown.
- FIXED: 线程数量固定,有队列,队列长度为固定值。无空闲线程时,请求被放入队列中。参数:
size the fixed number of threads queueSize the size of the backing queue, -1 for unbounded
- SCALING: 线程数量不固定,在core和max之间动态变化。参数:
core the minimum number of threads in the pool max the maximum number of threads in the pool keepAlive the time that spare threads above {@code core} threads will be kept alive
- FIXED_AUTO_QUEUE_SIZE: 线程数量固定,有队列,队列长度为不固定。参数:
ES中的线程池:size the fixed number of threads initialQueueSize initial size of the backing queue minQueueSize the minimum size of the backing queue maxQueueSize the maximum size of the backing queue
线程池名称 | 类型 | 介绍 | 参数默认值 |
---|---|---|---|
SAME | DIRECT | 通过当前线程直接执行某些逻辑 | |
LISTENER | FIXED | 用于java client得到响应时执行某些逻辑 | size:min((availableProcessors + 1) / 2, 10),queueSize:10 |
GET | FIXED | 用于get请求 | size:availableProcessors, queueSize:200 |
ANALYZE | FIXED | 用于analyze(分词)操作 | size:1, queueSize:16 |
WRITE | FIXED | 用于put/bulk请求 | size:availableProcessors, queueSize:200 |
FORCE_MERGE | FIXED | 用于segment force-merge操作 | size:1, queueSize:-1 |
GENERIC | SCALING | 通用的线程,如NodeDiscovery等 | core:4,max:availableProcessors*4 在区间[128,512]中的取值,keepAlive:30s |
MANAGEMENT | SCALING | 用于集群管理等 | core:1,max:5,keepAlive:5m |
FLUSH | SCALING | 用于flush操作 | core:1,max:min((availableProcessors + 1) / 2, 5),keepAlive:5m |
REFRESH | SCALING | 用于refresh操作 | core:1,max:min((availableProcessors + 1) / 2, 10),keepAlive:5m |
WARMER | SCALING | 用于segment warm-up操作 | core:1,max:min((availableProcessors + 1) / 2, 5),keepAlive:5m |
SNAPSHOT | SCALING | 用于snapshot操作 | core:1,max:min((availableProcessors + 1) / 2, 5),keepAlive:5m |
FETCH_SHARD_STARTED | SCALING | 用于fetch shard开始操作 | core:1,max:availableProcessors * 2,keepAlive:5m |
FETCH_SHARD_STORE | SCALING | 用于fetch shard存储操作 | core:1,max:availableProcessors * 2,keepAlive:5m |
SEARCH | FIXED_AUTO_QUEUE_SIZE | 用于search/count/suggest请求 | size:((availableProcessors * 3) / 2) + 1,initialQueueSize:1000,minQueueSize:1000,maxQueueSize:1000 |
- 创建客户端NodeClient
- 创建各种服务类对象xxxService,利用Guice注册各种服务使用的模块xxxModule
- Service:
服务 | 简介 |
---|---|
ResourceWatcherService | 监控统计各种服务使用的资源 |
NetworkService | TCP/IP/PORT等网络配置管理 |
ClusterService | 集群管理,集群状态发布、更新等 |
IngestService | Ingest Node的写入数据预处理服务 |
ClusterInfoService | 用于获取最新的集群信息 |
UsageService | 监视Elasticsearch各种功能的使用情况 |
MonitorService | JVM、进程、系统的监控服务 |
CircuitBreakerService | 熔断服务,资源使用超限时阻止任务执行 |
MetaStateService | 读写Metadata和IndexMetadata |
IndicesService | 索引管理,索引的创建、删除等操作 |
IndicesClusterStateService | 集群状态更新时,处理索引相关的操作 |
MetaDataIndexUpgradeService | 更新IndexMetadata至最新版本 |
TemplateUpgradeService | 节点加入集群时,升级其plugins相关的template |
TransportService | Transport层网络服务 |
HttpServerTransport | Http层网络服务,提供REST接口服务 |
ResponseCollectorService | 收集每个节点上执行任务的队列大小,响应时间和服务时间的统计信息 |
NodeService | 一个节点的实例,负责调用各种服务 |
SearchService | 处理查询任务 |
SnapshotsService | 快照服务 |
Discovery | 集群发现服务 |
RoutingService | 路由表管理 |
GatewayService | 集群、索引的元数据的持久化及恢复 |
- Module:ScriptModule、AnalysisModule、SettingsModule、PluginModule、ClusterModule、IndicesModule、SearchModule、GatewayModule、RepositoriesModule、ActionModule、NetworkModule、DiscoveryModule。各模块功能可以参照上面的同名Service。
第三步:启动Node
PATH
elasticsearch\node\Node.java
CODE
/**
* Start the node. If the node is already started, this method is no-op.
*/
public Node start() throws NodeValidationException {
// 1. 状态机,将local node的state设为STARTED状态
if (!lifecycle.moveToStarted()) {
return this;
}
logger.info("starting ...");
// LifecycleComponent in modules and plugins start
pluginLifecycleComponents.forEach(LifecycleComponent::start);
// 2. 获取创建Node时各种模块及服务绑定的实例,启动这些实例
// AbstractLifecycleComponent.start() -> class.doStart()
injector.getInstance(MappingUpdatedAction.class).setClient(client);
injector.getInstance(IndicesService.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(SnapshotsService.class).start();
injector.getInstance(SnapshotShardsService.class).start();
injector.getInstance(RoutingService.class).start();
injector.getInstance(SearchService.class).start();
nodeService.getMonitorService().start();
...
Discovery discovery = injector.getInstance(Discovery.class);
clusterService.getMasterService().setClusterStatePublisher(discovery::publish);
// 启动 transport service
TransportService transportService = injector.getInstance(TransportService.class);
transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
transportService.start();
// 加载本地的MeteData信息
final MetaData onDiskMetadata;
try {
if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) {
onDiskMetadata = injector.getInstance(GatewayMetaState.class).loadMetaState();
} else {
onDiskMetadata = MetaData.EMPTY_META_DATA;
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
// bootstrap的各项检测:BootstrapChecks.check
validateNodeBeforeAcceptingRequests(new BootstrapContext(settings, onDiskMetadata), transportService.boundAddress(), pluginsService
.filterPlugins(Plugin
.class)
.stream()
.flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));
// 初始化ClusterState,启动Discovery
discovery.start();
// 启动clusterServeice、clusterApplierService、masterService
clusterService.start();
// transport层启动,开始接受请求
transportService.acceptIncomingRequests();
// initial discovery -> ZenDiscovery.java innerJoinCluster(),加入集群
discovery.startInitialJoin();
// Http层启动,开始接受请求
injector.getInstance(HttpServerTransport.class).start();
// 节点启动成功
logger.info("started");
pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);
return this;
}
解析
启动 node 时,主要是获取各个服务模块绑定的实例,调用每个实例的 start() 方法(实际是 class.doStart())来启动各项服务。这其中比较重要的几个过程有:
- 启动 transport service,使得后续该节点可通过discovery过程加入集群
- 如果该节点node.master属性为true的话,加载本地的metadata,以获取原集群的信息(节点挂掉后重启的场景)
- bootstrap check,检测ES当前的运行环境,主要是操作系统和JVM参数,如下图所示。某些检测不通过则ES会报错退出。各项检测的具体含义可以参考官方文档 Bootstrap Checks。
- 启动discovery和clusterService,初始化集群元信息ClusterState
- 启动transport服务,用于节点间通信
- 启动initial discovery,加入所属的Elasticsearch集群
- 启动http服务,开始接受用户请求
启动日志
最后我们通过ES节点的日志来验证下上面讲述的节点启动流程
# 开始创建Node
[2018-12-28T19:54:44,159][INFO ][o.e.n.Node ] [] initializing ...
# 读取本地目录信息、JVM信息,创建NodeEnvironment
[2018-12-28T19:54:44,376][INFO ][o.e.e.NodeEnvironment ] [V9VXhfr] using [1] data paths, mounts [[项目 (D:)]], net usable_space [271.7gb], net total_space [310gb], types [NTFS]
[2018-12-28T19:54:44,376][INFO ][o.e.e.NodeEnvironment ] [V9VXhfr] heap size [1gb], compressed ordinary object pointers [true]
# 打印NodeName等节点信息
[2018-12-28T19:54:44,379][INFO ][o.e.n.Node ] [V9VXhfr] node name derived from node ID [V9VXhfr9TvSyUfxyr-ZQWg]; set [node.name] to override
[2018-12-28T19:54:44,379][INFO ][o.e.n.Node ] [V9VXhfr] version[6.4.3-SNAPSHOT], pid[19512], build[unknown/unknown/Unknown/Unknown], OS[Windows 7/6.1/amd64], JVM["Oracle Corporation"/Java HotSpot(TM) 64-Bit Server VM/10.0.1/10.0.1+10]
# 打印JVM信息
[2018-12-28T19:54:44,379][INFO ][o.e.n.Node ] [V9VXhfr] JVM arguments [-agentlib:jdwp=transport=dt_socket,address=127.0.0.1:8831,suspend=y,server=n, -Des.path.home=D:\elasticsearch_release\elasticsearch-6.4.3, -Des.path.conf=D:\elasticsearch_release\elasticsearch-6.4.3\config, -Djava.security.policy=D:\elasticsearch_release\elasticsearch-6.4.3\config\java.policy, -Dlog4j2.disable.jmx=true, -Xms1g, -Xmx1g, -javaagent:C:\Users\morningchen\.IntelliJIdea2018.3\system\groovyHotSwap\gragent.jar, -javaagent:C:\Users\morningchen\.IntelliJIdea2018.3\system\captureAgent\debugger-agent.jar, -Dfile.encoding=UTF-8]
# 初始化各种Service和Module
# 加载各种module
[2018-12-28T19:54:53,359][INFO ][o.e.p.PluginsService ] [V9VXhfr] loaded module [aggs-matrix-stats]
[2018-12-28T19:54:53,359][INFO ][o.e.p.PluginsService ] [V9VXhfr] loaded module [analysis-common]
...
[2018-12-28T19:54:53,362][INFO ][o.e.p.PluginsService ] [V9VXhfr] loaded module [x-pack-watcher]
# 加载plugin
[2018-12-28T19:54:53,362][INFO ][o.e.p.PluginsService ] [V9VXhfr] no plugins loaded
[2018-12-28T19:54:58,078][DEBUG][o.e.a.ActionModule ] Using REST wrapper from plugin org.elasticsearch.xpack.security.Security
[2018-12-28T19:54:58,271][INFO ][o.e.d.DiscoveryModule ] [V9VXhfr] using discovery type [zen]
[2018-12-28T19:54:59,102][INFO ][o.e.n.Node ] [V9VXhfr] initialized
# 开始启动Node
[2018-12-28T19:54:59,102][INFO ][o.e.n.Node ] [V9VXhfr] starting ...
# 启动Transport服务
[2018-12-28T19:54:59,428][INFO ][o.e.t.TransportService ] [V9VXhfr] publish_address {10.40.98.48:9300}, bound_addresses {127.0.0.1:9300}, {[::1]:9300}
# BootstrapChecks
[2018-12-28T19:54:59,442][INFO ][o.e.b.BootstrapChecks ] [V9VXhfr] bound or publishing to a non-loopback address, enforcing bootstrap checks
# 加入集群
[2018-12-28T19:55:54,939][INFO ][o.e.c.s.MasterService ] [V9VXhfr] zen-disco-elected-as-master ([0] nodes joined)[, ], reason: new_master {V9VXhfr}{V9VXhfr9TvSyUfxyr-ZQWg}{m7N1OuBHRSuJlyGOoIJmqw}{10.40.98.48}{10.40.98.48:9300}{ml.machine_memory=17099067392, xpack.installed=true, ml.max_open_jobs=20, ml.enabled=true}
[2018-12-28T19:55:54,945][INFO ][o.e.c.s.ClusterApplierService] [V9VXhfr] new_master {V9VXhfr}{V9VXhfr9TvSyUfxyr-ZQWg}{m7N1OuBHRSuJlyGOoIJmqw}{10.40.98.48}{10.40.98.48:9300}{ml.machine_memory=17099067392, xpack.installed=true, ml.max_open_jobs=20, ml.enabled=true}, reason: apply cluster state (from master [master {V9VXhfr}{V9VXhfr9TvSyUfxyr-ZQWg}{m7N1OuBHRSuJlyGOoIJmqw}{10.40.98.48}{10.40.98.48:9300}{ml.machine_memory=17099067392, xpack.installed=true, ml.max_open_jobs=20, ml.enabled=true} committed version [1] source [zen-disco-elected-as-master ([0] nodes joined)[, ]]])
# 启动Http服务
[2018-12-28T19:55:58,722][INFO ][o.e.x.s.t.n.SecurityNetty4HttpServerTransport] [V9VXhfr] publish_address {10.40.98.48:9200}, bound_addresses {127.0.0.1:9200}, {[::1]:9200}
# 节点启动完毕
[2018-12-28T19:55:58,723][INFO ][o.e.n.Node ] [V9VXhfr] started
# license检测
[2018-12-28T19:55:58,902][INFO ][o.e.l.LicenseService ] [V9VXhfr] license [a4e08819-7a8b-4017-8b85-5329bc2909b0] mode [basic] - valid
# 开始恢复本地数据
[2018-12-28T19:55:58,926][INFO ][o.e.g.GatewayService ] [V9VXhfr] recovered [0] indices into cluster_state
作者:morningchen 腾讯工程师
来源:腾讯云
原文链接:cloud.tencent.com/developer/articl...