第一章 学习jetlinks 网络组件

网络组件的作用

用于管理各种网络服务(MQTT,TCP等),动态配置,启停.>用于管理各种网络服务(MQTT,TCP等),动态配置,启停. 只负责接收,发送报文,不负责任何处理逻辑。

1.启动网络组件

    // 代码路径 org.jetlinks.community.network.manager.web/ NetworkConfigController
    @PostMapping("/{id}/_start")
    @SaveAction
    @Operation(summary = "启动网络组件")
    public Mono<Void> start(@PathVariable
                            @Parameter(description = "网络组件ID") String id) {

        // 1.查看配置是否存在,不存在抛出没有发现的异常
        // 2. 更新状态为 启动状态
        // 3. 网络组件管理器 加载该组件 让其启动
        return configService.findById(id)
            .switchIfEmpty(Mono.error(() -> new NotFoundException("配置[" + id + "]不存在")))
            .flatMap(conf -> configService.createUpdate()
                .set(NetworkConfigEntity::getState, NetworkConfigState.enabled)
                .where(conf::getId)
                .execute()
                .thenReturn(conf))
            .flatMap(conf -> networkManager.reload(conf.lookupNetworkType(), id));
    }

2. 加载网络组件

//代码路径: org.jetlinks.community.network/ DefaultNetworkManager

    @Override
    public Mono<Void> reload(NetworkType type, String id) {
        // 1.justOrEmpty 可以保证传入参数为空时也不会报错 判断是否启动网络组件
        // 存在就停止,再重新获得该网络组件
        return Mono.justOrEmpty(getNetworkStore(type)
            .get(id))
            .doOnNext(Network::shutdown)
            .then(getNetwork(type, id))
            .then();
    }

    @Override
    public <T extends Network> Mono<T> getNetwork(NetworkType type, String id) {
        Map<String, Network> networkMap = getNetworkStore(type);
        return Mono.justOrEmpty(networkMap.get(id))
            .filter(Network::isAlive)
            .switchIfEmpty(Mono.defer(() -> createNetwork(type, id)))
            .map(n -> (T) n);
    }
    // 创建网络 查看网络组件配置是否存在
    public Mono<Network> createNetwork(NetworkType type, String id) {
        return Mono.justOrEmpty(providerSupport.get(type.getId()))
            .switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("不支持的类型:" + type.getName())))
            .flatMap(provider -> configManager
                .getConfig(type, id)
                .switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("网络[" + type.getName() + "]配置[" + id + "]不存在")))
                .filter(NetworkProperties::isEnabled)
                .switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("网络[" + type.getName() + "]配置[" + id + "]已禁用")))
                .flatMap(provider::createConfig)
                .map(config -> doCreate(provider, id, config)));
    }

创建网络组件

   // 代码路径: org.jetlinks.community.network/ DefaultNetworkManager
   /**
     * 如果store中不存在网络组件就创建,存在就重新加载
     *
     * @param provider   网络组件支持提供商
     * @param id         网络组件唯一标识
     * @param properties 网络组件配置
     * @return 网络组件
     */
    public Network doCreate(NetworkProvider<Object> provider, String id, Object properties) {
        return getNetworkStore(provider.getType()).compute(id, (s, network) -> {
            if (network == null) {
                network = provider.createNetwork(properties);
            } else {
                //单例,已经存在则重新加载
                provider.reload(network, properties);
            }
            return network;
        });
    }
  // 代码路径 org.jetlinks.community.network.tcp.server/TcpServerProvider.class
  // 使用 vertx 量创建 tcp 服务器 
    @Nonnull
    @Override
    public VertxTcpServer createNetwork(@Nonnull TcpServerProperties properties) {

        VertxTcpServer tcpServer = new VertxTcpServer(properties.getId());
        initTcpServer(tcpServer, properties);

        return tcpServer;
    }

     /**
     * TCP服务初始化
     *
     * @param tcpServer  TCP服务
     * @param properties TCP配置
     */
    private void initTcpServer(VertxTcpServer tcpServer, TcpServerProperties properties) {
        int instance = Math.max(2, properties.getInstance());
        List<NetServer> instances = new ArrayList<>(instance);
        for (int i = 0; i < instance; i++) {
            instances.add(vertx.createNetServer(properties.getOptions()));
        }
        // 根据解析类型配置数据解析器
        Supplier<PayloadParser> parser= payloadParserBuilder.build(properties.getParserType(), properties);
        parser.get();
        tcpServer.setParserSupplier(parser);
        tcpServer.setServer(instances);
        tcpServer.setKeepAliveTimeout(properties.getLong("keepAliveTimeout", Duration.ofMinutes(10).toMillis()));
        // 针对JVM做的多路复用优化
        // 多个server listen同一个端口,每个client连接的时候vertx会分配
        // 一个connection只能在一个server中处理
        for (NetServer netServer : instances) {
            // 监听端口 
            netServer.listen(properties.createSocketAddress(), result -> {
                if (result.succeeded()) {
                    // 这里说明 tcp server 已经启动成功 
                    log.info("tcp server startup on {}", result.result().actualPort());
                } else {
                    log.error("startup tcp server error", result.cause());
                }
            });
        }
    }

上面的网络组件既然已经启动成功了,那么又是如何接受报文和发送报文呢 ?

     // 有上面的代码   tcpServer.setServer(instances) 调用引入下面代码来处理
    // 代码路径 org.jetlinks.community.network.tcp.server/VertxTcpServer
    /**
     * 为每个NetServer添加connectHandler  处理连接
     *
     * @param servers 创建的所有NetServer
     */
    public void setServer(Collection<NetServer> servers) {
        if (this.tcpServers != null && !this.tcpServers.isEmpty()) {
            shutdown();
        }
        this.tcpServers = servers;

        for (NetServer tcpServer : this.tcpServers) {
            tcpServer.connectHandler(this::acceptTcpConnection);
        }

    }

     /**
     * TCP连接处理逻辑
     *
     * @param socket socket
     */
    protected void acceptTcpConnection(NetSocket socket) {
        if (!processor.hasDownstreams()) {
            log.warn("not handler for tcp client[{}]", socket.remoteAddress());
            socket.close();
            return;
        }
        // 客户端连接处理  这里为什么来 创建客户端来处理连接 代理和解耦 ? 还是提供并发
        VertxTcpClient client = new VertxTcpClient(id + "_" + socket.remoteAddress(), true);
        client.setKeepAliveTimeoutMs(keepAliveTimeout);
        try {
            // TCP异常和关闭处理
            socket.exceptionHandler(err -> {
                log.error("tcp server client [{}] error", socket.remoteAddress(), err);
            }).closeHandler((nil) -> {
                log.debug("tcp server client [{}] closed", socket.remoteAddress());
                client.shutdown();
            });
            // 这个地方是在TCP服务初始化的时候设置的 parserSupplier
            // set方法 org.jetlinks.community.network.tcp.server.VertxTcpServer.setParserSupplier
            // 调用坐标 org.jetlinks.community.network.tcp.server.TcpServerProvider.initTcpServer
            client.setRecordParser(parserSupplier.get());
            // 这里面真正是去处理数据包
            client.setSocket(socket);
            // client放进了发射器  这里也不懂 什么 发射器  sink 是下面代码创建出来的
            // private final EmitterProcessor<TcpClient> processor = EmitterProcessor.create(false);
           //  private final FluxSink<TcpClient> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
            sink.next(client);
            log.debug("accept tcp client [{}] connection", socket.remoteAddress());
        } catch (Exception e) {
            log.error("create tcp server client error", e);
            client.shutdown();
        }

处理 socket


    // 代码路径  org.jetlinks.community.network.tcp.client/VertxTcpClient.class
    /**
     * socket处理
     *
     * @param socket socket
     */
    public void setSocket(NetSocket socket) {
        synchronized (this) {
            Objects.requireNonNull(payloadParser);
            if (this.socket != null && this.socket != socket) {
                this.socket.close();
            }
            this.socket = socket
                .closeHandler(v -> shutdown())
                .handler(buffer -> {
                    if (log.isDebugEnabled()) {
                        log.debug("handle tcp client[{}] payload:[{}]",
                            socket.remoteAddress(),
                            Hex.encodeHexString(buffer.getBytes()));
                    }
                    keepAlive();
                    // payloadParser 这个又是什么 使用了那个分析器
                    payloadParser.handle(buffer);
                    if (this.socket != socket) {
                        log.warn("tcp client [{}] memory leak ", socket.remoteAddress());
                        socket.close();
                    }
                });
        }
    }

    // 代码路径 org.jetlinks.community.network.tcp.parser/ PayloadParser 接口
        /**
     * 处理一个数据包
     *
     * @param buffer 数据包
     */
    void handle(Buffer buffer);
    // 找到这里 那么该方法又给谁实现了 ?

// 找到一个实现的方法
// 代码路径 org.jetlinks.community.network.tcp.parser/ DirectRecordParser.class

/**
 * 不处理直接返回数据包
 *
 * @author zhouhao
 * @since 1.0
 */
public class DirectRecordParser implements PayloadParser {

    private final Sinks.Many<Buffer> sink = Reactors.createMany();

    @Override
    // 直接返回数据包 依然 网络组件 只管接收和发送数据包,是否就是它了 ?然后就交给网关来
    // 来获取 协议来 处理该数据包 然后再做逻辑处理,再发送事件总线 
    public void handle(Buffer buffer) {
        sink.emitNext(buffer, Reactors.emitFailureHandler());
    }

    @Override
    public Flux<Buffer> handlePayload() {
        return sink.asFlux();
    }

    @Override
    public void close() {
        sink.emitComplete(Reactors.emitFailureHandler());
    }
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
未填写
文章
70
粉丝
5
喜欢
39
收藏
42
排名:90
访问:9.8 万
私信
所有博文
社区赞助商