第一章 学习jetlinks 网络组件
网络组件的作用
用于管理各种网络服务(MQTT,TCP等),动态配置,启停.>用于管理各种网络服务(MQTT,TCP等),动态配置,启停. 只负责接收,发送报文,不负责任何处理逻辑。
1.启动网络组件
// 代码路径 org.jetlinks.community.network.manager.web/ NetworkConfigController
@PostMapping("/{id}/_start")
@SaveAction
@Operation(summary = "启动网络组件")
public Mono<Void> start(@PathVariable
@Parameter(description = "网络组件ID") String id) {
// 1.查看配置是否存在,不存在抛出没有发现的异常
// 2. 更新状态为 启动状态
// 3. 网络组件管理器 加载该组件 让其启动
return configService.findById(id)
.switchIfEmpty(Mono.error(() -> new NotFoundException("配置[" + id + "]不存在")))
.flatMap(conf -> configService.createUpdate()
.set(NetworkConfigEntity::getState, NetworkConfigState.enabled)
.where(conf::getId)
.execute()
.thenReturn(conf))
.flatMap(conf -> networkManager.reload(conf.lookupNetworkType(), id));
}
2. 加载网络组件
//代码路径: org.jetlinks.community.network/ DefaultNetworkManager
@Override
public Mono<Void> reload(NetworkType type, String id) {
// 1.justOrEmpty 可以保证传入参数为空时也不会报错 判断是否启动网络组件
// 存在就停止,再重新获得该网络组件
return Mono.justOrEmpty(getNetworkStore(type)
.get(id))
.doOnNext(Network::shutdown)
.then(getNetwork(type, id))
.then();
}
@Override
public <T extends Network> Mono<T> getNetwork(NetworkType type, String id) {
Map<String, Network> networkMap = getNetworkStore(type);
return Mono.justOrEmpty(networkMap.get(id))
.filter(Network::isAlive)
.switchIfEmpty(Mono.defer(() -> createNetwork(type, id)))
.map(n -> (T) n);
}
// 创建网络 查看网络组件配置是否存在
public Mono<Network> createNetwork(NetworkType type, String id) {
return Mono.justOrEmpty(providerSupport.get(type.getId()))
.switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("不支持的类型:" + type.getName())))
.flatMap(provider -> configManager
.getConfig(type, id)
.switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("网络[" + type.getName() + "]配置[" + id + "]不存在")))
.filter(NetworkProperties::isEnabled)
.switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("网络[" + type.getName() + "]配置[" + id + "]已禁用")))
.flatMap(provider::createConfig)
.map(config -> doCreate(provider, id, config)));
}
创建网络组件
// 代码路径: org.jetlinks.community.network/ DefaultNetworkManager
/**
* 如果store中不存在网络组件就创建,存在就重新加载
*
* @param provider 网络组件支持提供商
* @param id 网络组件唯一标识
* @param properties 网络组件配置
* @return 网络组件
*/
public Network doCreate(NetworkProvider<Object> provider, String id, Object properties) {
return getNetworkStore(provider.getType()).compute(id, (s, network) -> {
if (network == null) {
network = provider.createNetwork(properties);
} else {
//单例,已经存在则重新加载
provider.reload(network, properties);
}
return network;
});
}
// 代码路径 org.jetlinks.community.network.tcp.server/TcpServerProvider.class
// 使用 vertx 量创建 tcp 服务器
@Nonnull
@Override
public VertxTcpServer createNetwork(@Nonnull TcpServerProperties properties) {
VertxTcpServer tcpServer = new VertxTcpServer(properties.getId());
initTcpServer(tcpServer, properties);
return tcpServer;
}
/**
* TCP服务初始化
*
* @param tcpServer TCP服务
* @param properties TCP配置
*/
private void initTcpServer(VertxTcpServer tcpServer, TcpServerProperties properties) {
int instance = Math.max(2, properties.getInstance());
List<NetServer> instances = new ArrayList<>(instance);
for (int i = 0; i < instance; i++) {
instances.add(vertx.createNetServer(properties.getOptions()));
}
// 根据解析类型配置数据解析器
Supplier<PayloadParser> parser= payloadParserBuilder.build(properties.getParserType(), properties);
parser.get();
tcpServer.setParserSupplier(parser);
tcpServer.setServer(instances);
tcpServer.setKeepAliveTimeout(properties.getLong("keepAliveTimeout", Duration.ofMinutes(10).toMillis()));
// 针对JVM做的多路复用优化
// 多个server listen同一个端口,每个client连接的时候vertx会分配
// 一个connection只能在一个server中处理
for (NetServer netServer : instances) {
// 监听端口
netServer.listen(properties.createSocketAddress(), result -> {
if (result.succeeded()) {
// 这里说明 tcp server 已经启动成功
log.info("tcp server startup on {}", result.result().actualPort());
} else {
log.error("startup tcp server error", result.cause());
}
});
}
}
上面的网络组件既然已经启动成功了,那么又是如何接受报文和发送报文呢 ?
// 有上面的代码 tcpServer.setServer(instances) 调用引入下面代码来处理
// 代码路径 org.jetlinks.community.network.tcp.server/VertxTcpServer
/**
* 为每个NetServer添加connectHandler 处理连接
*
* @param servers 创建的所有NetServer
*/
public void setServer(Collection<NetServer> servers) {
if (this.tcpServers != null && !this.tcpServers.isEmpty()) {
shutdown();
}
this.tcpServers = servers;
for (NetServer tcpServer : this.tcpServers) {
tcpServer.connectHandler(this::acceptTcpConnection);
}
}
/**
* TCP连接处理逻辑
*
* @param socket socket
*/
protected void acceptTcpConnection(NetSocket socket) {
if (!processor.hasDownstreams()) {
log.warn("not handler for tcp client[{}]", socket.remoteAddress());
socket.close();
return;
}
// 客户端连接处理 这里为什么来 创建客户端来处理连接 代理和解耦 ? 还是提供并发
VertxTcpClient client = new VertxTcpClient(id + "_" + socket.remoteAddress(), true);
client.setKeepAliveTimeoutMs(keepAliveTimeout);
try {
// TCP异常和关闭处理
socket.exceptionHandler(err -> {
log.error("tcp server client [{}] error", socket.remoteAddress(), err);
}).closeHandler((nil) -> {
log.debug("tcp server client [{}] closed", socket.remoteAddress());
client.shutdown();
});
// 这个地方是在TCP服务初始化的时候设置的 parserSupplier
// set方法 org.jetlinks.community.network.tcp.server.VertxTcpServer.setParserSupplier
// 调用坐标 org.jetlinks.community.network.tcp.server.TcpServerProvider.initTcpServer
client.setRecordParser(parserSupplier.get());
// 这里面真正是去处理数据包
client.setSocket(socket);
// client放进了发射器 这里也不懂 什么 发射器 sink 是下面代码创建出来的
// private final EmitterProcessor<TcpClient> processor = EmitterProcessor.create(false);
// private final FluxSink<TcpClient> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
sink.next(client);
log.debug("accept tcp client [{}] connection", socket.remoteAddress());
} catch (Exception e) {
log.error("create tcp server client error", e);
client.shutdown();
}
处理 socket
// 代码路径 org.jetlinks.community.network.tcp.client/VertxTcpClient.class
/**
* socket处理
*
* @param socket socket
*/
public void setSocket(NetSocket socket) {
synchronized (this) {
Objects.requireNonNull(payloadParser);
if (this.socket != null && this.socket != socket) {
this.socket.close();
}
this.socket = socket
.closeHandler(v -> shutdown())
.handler(buffer -> {
if (log.isDebugEnabled()) {
log.debug("handle tcp client[{}] payload:[{}]",
socket.remoteAddress(),
Hex.encodeHexString(buffer.getBytes()));
}
keepAlive();
// payloadParser 这个又是什么 使用了那个分析器
payloadParser.handle(buffer);
if (this.socket != socket) {
log.warn("tcp client [{}] memory leak ", socket.remoteAddress());
socket.close();
}
});
}
}
// 代码路径 org.jetlinks.community.network.tcp.parser/ PayloadParser 接口
/**
* 处理一个数据包
*
* @param buffer 数据包
*/
void handle(Buffer buffer);
// 找到这里 那么该方法又给谁实现了 ?
// 找到一个实现的方法
// 代码路径 org.jetlinks.community.network.tcp.parser/ DirectRecordParser.class
/**
* 不处理直接返回数据包
*
* @author zhouhao
* @since 1.0
*/
public class DirectRecordParser implements PayloadParser {
private final Sinks.Many<Buffer> sink = Reactors.createMany();
@Override
// 直接返回数据包 依然 网络组件 只管接收和发送数据包,是否就是它了 ?然后就交给网关来
// 来获取 协议来 处理该数据包 然后再做逻辑处理,再发送事件总线
public void handle(Buffer buffer) {
sink.emitNext(buffer, Reactors.emitFailureHandler());
}
@Override
public Flux<Buffer> handlePayload() {
return sink.asFlux();
}
@Override
public void close() {
sink.emitComplete(Reactors.emitFailureHandler());
}
}
本作品采用《CC 协议》,转载必须注明作者和本文链接