ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

FLINK Notebook 混合编程:PYTHON (一)

2021-05-14 22:30:19  阅读:244  来源: 互联网

标签:Flink java PYTHON FLINK python Python Notebook Py4j


本文介绍了 Py4j的使用以及 Flink官方如何使用 Py4j进行混合语言编程,最后会介绍下我们会应用这种技术在我们的 Flink Notebook 服务,来创建一个混合语言编程环境。

Flink Notebook 服务是我司自研的基于Notebook方式的Flink 开发平台,他支持用户通过SQL方式和JAR包方式进行混合编程,并通过一些配置,既可完全的在页面上完成FLINK任务的开发工作,如图:

在这里插入图片描述

​ 通过不同的Notebook Type我们可以加载不同类型的组件,通过table结果集流转方式,承接上下游,以完成相应的功能。目前的插件类型主要是主要分:1.SQL组建:可以自由撰写SQL,2.格式化组建:sink或者source,有具体的格式,标准的前端组建对应,3.JAR包自定义组件,通过用户上传自己开发的jar包完成对应的逻辑。

​ 对于Jar包自定义组件来说,他是为了解决1%的特异性需求的,但问题是其代码不可见,逻辑也相对自由,有违Notebook的初衷,因此,我们想设计一种Notebook的Type,支持可视化的Python编写,可以直接将代码在页面上进行开发。

​ Flink 本身来说,就有PyFlink 和 Python UDF support,因此python和 flink的耦合度应该很高,所以我们要了解Flink是怎么做的,从而研究我们应该如何去做,所以本文会分成以下3个部分来介绍整个混编逻辑:

1. Java与Python 通信:Py4J
2. Py4j in Flink
3. Notebook with Python

Py4j 介绍

Py4j可以使运行于python解释器的python程序动态的访问java虚拟机中的java对象。Java方法可以像java对象就在python解释器里一样被调用, java collection也可以通过标准python collection方法调用。Py4j也可以使java程序回调python对象。

详细说明可以参考官网 https://www.py4j.org/

安装以及基本使用也可以参考官网

Py4j可以在系统中创建一个 java和python 之间通信的socket管道。
在这里插入图片描述
我们可以通过一个例子来看整个Py4j是如何工作的。

我们先创建一个想让python负责具体实现的Java 接口:

public interface TestEnterPoint {
    String gift(HashMap<String,String> a, String b);
}

在java 服务端,我们通过以下代码可以启动一个简单的Py4j监听:

    public static void main(String[] args) {
        ListenerApplication application = new ListenerApplication();
        GatewayServer server = new GatewayServer(application);
        server.start(true);
    }

ListenerApplication 表示一个允许共享给python的类,她可以是任意java类,包括Map,List等复杂结构化数据:

public class ListenerApplication {
    TestEnterPoint enterPoint = new TestEnterPoint();
    public void setListener(TestEnterPoint enterPoint) {
        this.enterPoint = enterPoint;
    }
    public void notifyAllListeners() {
        HashMap<String,String> map = new HashMap<>();
        map.put("a","aaaa");
        Object returnValue = listener.gift(map,"a");
        System.out.println(returnValue);
    }
}

而在Python端,我们可以通过以下代码运行一个python程序:

from py4j.java_gateway import JavaGateway, CallbackServerParameters

class TestEnterPoint(object):
    def gift(self, map, key):
        return map.get(key)
    class Java:
        implements = ["com.xxxx.xxx.test.py.TestEnterPoint"]

if __name__ == "__main__":
    gateway = JavaGateway(
        callback_server_parameters=CallbackServerParameters())
    listener = TestEnterPoint()
    gateway.entry_point.setListener(listener)
    gateway.entry_point.notifyAllListeners()
    gateway.shutdown()

这样我们就通过Python来实现了一个 map.get(key) 的方法

整个过程中,我们看出几点对于python来说比较基本的使用方式,那就是,第一,通过Python 中implements = ["com.xxxx.xxx.test.py.TestEnterPoint"]的使用方式,我们可以实现一个Java的Interface,第二,通过gateway.entry_point的方式,我们可以拿到java中设置的可共享变量,第三个我们在例子中并没有呈现,但也是非常基础的使用,就是通过在python中使用 gateway.jvm.com.xxxx.xxx.test.py.TestServer的方式,允许python使用任何java的class,允许初始化,允许调用方法,但是他们如果想和java端进行数据通信,则必须通过entry_point来实现。

Py4j in Flink

讲完Py4j并且如果把上面的代码自己拿来试下,应该已经对整个python和java互通有一定理解了,那么我们Flink中如何使用Py4J来进行混编,也就顺理成章的很好理解了,在Flink中,有很多地方使用到了这种技术,包括PyFlink,以及Python UDF support,PyFlink 属于Pyton为主,也比较复杂,这边就先就以简单的Python UDF为例,梳理下Flink的执行逻辑。

在Flink Java中如何使用Python UDF

在Flink 中使用Python的UDF相对来说非常简单,创建一个Python代码,比如:

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def func1(s: str):
   return s.replace('hello', 'ni hao')

在Flink Java中,需要配置Python环境变量,首先将Python文件加到环境中去,如果是集群提交,需要加到依赖中去(使用-pyfs 提交Python文件),或者远程的Hdfs文件。其次需要配置Python的程序依赖环境路径:

configuration.setString("python.files", "/Users/yourName/test.py");
configuration.setString("python.client.executable", "python3");
configuration.setString("python.executable", "/usr/bin/python3");

最后,我们在使用过程中,比如通过SQL使用时候,只需要如下SQL语句即可:

create temporary system function func1 as 'test1.func1' language python

其中test1是python的文件名"test1.py"而 func1就是上文中的那个python 的function name,如此既可以在java中使用python实现的UDF

Flink是如何实现这些的

​ 在追踪Flink Sql是如何执行create function过程中,我们发现整个Flink的执行流程大致如图:

在这里插入图片描述

​ Flink会通过语法解析后的通过create function的后缀“ language python”判断是否是Python fuction,如果是,会调用PythonFunctionUtils来获取function,而PythonFunctionUtil最终通过动态加载的PythonFunctionFactory来最终调用Py4j。这里可以看见他的逻辑其实也比较简单,首先就是启动Py4j的Java端server,然后主要就是通过环境变量,以及configture 里的各种参数,最终拼接出python的cmd 执行命令,运行命令并通过entryPoint获取其中的贡献类。最终生成我们在java端可以用的function。

​ 这块如果有兴趣,在Flink源码中搜索 PythonFunctionFactory 可以直接看见相关代码。

Notebook 混编Python

我们平台是类似Zeeplin的可视化Notebook编程页面,对于我们来说,要在页面上支持Python编程,有几种方案:
  • 只支持Python UDF
  • 以PyFlink为基础,配置混合编程方案
  • 以Java版为基础,配置混合编程方案

方案一对于我们来说并不难,可以看到Flink官方既是支持Python UDF的,我们只需要将这个Notebook Part里的内容,生成Python文件并添加到环境中一起提交即可,但这种方案没法解决我们上面提出的一大痛点,用户的1%需要Jar包开发的非标任务,不是单单可以通过UDF来实现的。

方案二对于我们来说,最大的问题是所有的优化,整个程序体系都是建立在Java 基础上的,改动会非常巨大。

如此,只能采取方案三,而方案三的问题是,Flink的原版PyFlink只创建了 PythonFunctionFactory 和一个 心跳2个 entryPoint,这对我们来说比较局限。所以我们会采取模仿 PythonFunctionFactory 的方式,自己创建Py4j进程,来完成Notebook的混编实现

这里的详细设计以及Demo 我们会在下篇文章(二)中放出。谢谢各位。

标签:Flink,java,PYTHON,FLINK,python,Python,Notebook,Py4j
来源: https://blog.csdn.net/wty19/article/details/116573129

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有