当前位置 博文首页 > 静若清池:将Flink计算完毕后的数据Sink到Nebula

    静若清池:将Flink计算完毕后的数据Sink到Nebula

    作者:静若清池 时间:2021-07-03 18:25

    Flink是目前流计算的隐形王者,在国际国内有有庞大的拥趸。

    Nebula是国产图数据库的后起之秀,在DBEngines中排名也逐年上升。

    将两者进行结合,可以产生很多应用场景:比如实时计算服务链路调用关系并将结果存到Nebula中、实时计算业务访问风控情况并将结果存到Nebula中、实时计算预警发生情况并将结果存到Nebula中等。

    将Flink计算完毕后的结果,Sink到Nebula,Nebula官方提供了一个Flink Connector,但是很不易用。

    笔者根据项目实际应用情况,写了一个更简洁直接的Sink,作为抛砖引玉,欢迎各位Flink及Nebula爱好者共同交流。

    一、NebulaUtil

    由于Nebula提供的Java Client是非线程安全的,所以我们首先封装一个单例的NebulaUtil,主要代码如下:

    import lombok.val;
    import lombok.var;
    /**
     * Nebula工具类
     */
    public class NebulaUtil {
        // Nebula会话
        private Session session = null;
        // Nebula连接池
        private NebulaPool pool = new NebulaPool();/**
         * 获得Nebula工具类单例
         *
         * @return NebulaUtil
         */
        public static NebulaUtil getInstance() {
            return NebulaUtilHolder.instance;
        }
    
        /**
         * 执行NGQL
         *
         * @param nGQL NGQL
         * @return 返回执行结果
         */
        public ResultSet execute(String nGQL) {
            try {
                if (session != null) {
                    return session.execute(nGQL);
                }
            } catch (IOErrorException e) {
                e.printStackTrace();
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
    
            return null;
        }
       /**
         * 释放会话
         */
        public void releaseSession() {
            // 释放连接
            if (session != null) {
                session.release();
            }
    
            // 关闭连接池
            pool.close();
        }
    
        private static class NebulaUtilHolder {
            private static final NebulaUtil instance = new NebulaUtil();
        }
    
        private NebulaUtil() {
            initSession();
        }
    
        /**
         * 初始化会话
         */
        private void initSession() {// 连接地址,多个间用逗号“,”隔开
            val host = "127.0.0.1";
            val port = 9669;
            val user = "user";
            val password = "password";
            val space = "MySpace";
    
            var nebulaPoolConfig = new NebulaPoolConfig();
            nebulaPoolConfig.setMaxConnSize(100);
    
            var hostAddressList = new ArrayList<HostAddress>();
    
            val hostArray = host.split(",");
    
            for (val hostAddress : hostArray) {
                hostAddressList.add(new HostAddress(hostAddress, port));
            }
    
            try {
                pool.init(hostAddressList, nebulaPoolConfig);
            } catch (UnknownHostException e) {
               e.printStackTrace();
            }
    
            try {
                session = pool.getSession(user, password, false);
            } catch (NotValidConnectionException e) {
                e.printStackTrace();
            } catch (IOErrorException e) {
               e.printStackTrace();
            } catch (AuthFailedException e) {
                e.printStackTrace();
            }
    
            // 切换图空间
            val resp = execute(String.format("USE %s;", space));
    
            if (resp == null || !resp.isSucceeded()) {
                System.out.println("切换图空间失败!" + space);
            }
        }
    }

     

    二、NebulaSink

    有了NebulaUtil,实现NebulaSink就非常简单了,每个方法里只有几行代码:

    import lombok.val;

    /** * Sink到Nebula数据库 */ public class NebulaSink extends RichSinkFunction<List<String>> { /** * 打开连接 * * @param parameters 配置参数 */ @Override public void open(Configuration parameters) { } /** * 调用 * * @param nGQLList NGQL列表 * @param context 上下文 */ @Override public void invoke(List<String> nGQLList, Context context) { for (val nGQL : nGQLList) { NebulaUtil.getInstance().execute(nGQL); } } /** * 关闭连接 */ @Override public void close() throws Exception { super.close(); NebulaUtil.getInstance().releaseSession(); } }

     

    三、将Vertex及Edge数据组装成NGQL语句

    有了NebulaUtil以及NebulaSink后,Sink到Nebula之前,我们主要的工作就是将Vertex及Edge数据,组装对应的NGQL语句即可。

     

    bkbky
    下一篇:没有了