Kettle plugin 插件开发,kettleplugin


Kettle本身提供了很多组件,多个组件一起构成一个transformation(转换),多个转换一起构成一个job(任务)。kettle的组件已经非常丰富,在组件不满足需求时可以在kettle上面开发自己的组件,kettle支持的组件开发如下:

 

Kettle中的插件包含两部分:

一是系统本身就已经实现的功能点,在源码目录src中说明,如kettle-steps.xml;

二是系统之外开发的插件,在plugins目录对应插件目录下的plugins.xml说明,如plugins/steps/S3CsvInput/plugins.xml。

 

1.1系统集成插件定义( step )

内容

位置

插件说明信息

engine/src/kettle-steps.xml,所有插件集中说明

插件源码

src-engine与src-ui下,org.pentaho.di.steps.插件名

插件图片

插件说明xml中说明

插件界面文字说明

org.pentaho.di.steps.插件名.messages

插件说明信息中包括描述信息、类名(包括package,反射用)、父级目录(Spoon左侧栏目录)、提示信息和图片信息。Kettle使用国家化方式编程,所以软件中的所有文字描述均由messages_**.properties提供。

系统集成插件配置说明kettle-steps.xml结构:


1.2扩展插件定义(step)

所有新开发的扩展插件,均放在同一的目录下进行管理,插件管理模块会自动去该目录下进行搜索查找。插件目录结构如下所示(以S3CsvInput步骤为例):

Plugins/steps/S3CsvInput

 

 扩展插件定义

内容

位置

插件说明信息

plugins/steps/插件名称/plugin.xml

插件源码

plugins/steps/插件名称/*.jar

插件图片

plugins/steps/插件名称/

插件依赖包

plugins/steps/插件名称/

扩展插件与系统集成插件的说明内容相似,扩展插件增加ID属性和依赖属性,同时他的目录结构、描述信息和提示信息均能进行国际化配置。

1.3扩展插件配置说明plugin.xml结构及参数说明:

 


ID:在kettle插件中必须全局唯一,因为被kettle序列化了,所以不要随便改变

Iconfile: kettle中插件显示的图片,必须是png图片

Description:插件描叙,显示在树形菜单里面。

Tooltip:树形菜单中,鼠标滑过的时候显示的提示信息

Category:插件显示的父目录

Classname:元数据类

Library:指明了插件需要加载所依赖的jar包

 

 

2. Kettle转换步骤扩展插件的开发

2.1 Kettle转换步骤插件至少需要实现四个接口 

org.pentaho.di.trans.step.StepMetaInterface:元数据的处理,加载xml,校验,主要是对一个步骤的定义的基本数据。 

org.pentaho.di.trans.step. StepDataInterface:数据处理涉及的具体数据,以及对数据的状态的设置和回收。 

org.pentaho.di.trans.step. StepInterface:负责数据处理,转换和流转。这里面主要由processRow()方法来处理。 

org.pentaho.di.trans.step. StepDialogInterface:提供GUI/dialog,编辑步骤的元数据。 

对于以上四个接口的实现,都有相应的基类,具体的步骤只需要继承基类和实现相应的接口即可。 

Step扩展接口:

Java 接口

基类

主要功能

StepMetaInterface

BaseStepMeta

存储step设置信息

验证step设置信息

序列化step设置信息

提供获取step类的方法

StepDialogInterface

BaseStepDialog

step属性信息配置窗口

StepInterface

BaseStep

处理rows

StepDataInterface

BaseStepData

为数据处理提高数据存储

 

2.2 Kettle转换步骤插件各个类命名推荐规则 

stepInterface的实现类以插件的功能相关命名:*.java  

stepDataInterface的实现类:*Data.java  

stepMetaInterface的实现类:*Meta.java  

StepDialogInterface的实现类:*Dialog.java

 

2.3  TemplateStepPlugin插件模板各个类源码部分方法说明:

一、元数据类:

下面是元数据的几个关键的方法,注意元数据类里面用私有成员变量outputField存储了下一个步骤的输出字段。

 // keeptrack of the step settings
public String getOutputField()
public void setOutputField(…)
public void setDefault()

// xml类型元数据的写入与加载
public String getXML()
public void loadXML(…)

// 资源库类型元数据的写入与加载
public void readRep(…)
public void saveRep(…)

//提供有关步骤如何影响处理行字段结构的信息
public void getFields(…)

//为步骤进行扩展有效性检查
public void check(…)

// step,data dialog 类提供实例
public StepInterface getStep(…)
public StepDataInterface getStepData()
public StepDialogInterface getDialog(…)

TemplateStepMeta元数据类其实还有很多方面,不过大多被他的父类BaseStepMeta给默认实现了,这些默认的实现足以使我们的元数据类工作良好。 

二、对话框类:

TemeplateStepDialog为步骤实现了对话框的设置,kettle的用户界面部件是使用的eclipseswt框架。在开发过程中,一个对话框对象拥有一个元数据对象,它记录了应该从哪里读取配置?应该把设置好的配置保存在哪里?

 

三、步骤类:

步骤类是实际的处理和转换工作的地方。因为大部分样本代码已经由父类BaseStep提供了,插件开发者只需要关注下面几个特定的方法就行了。

 //初始化和关闭
public boolean 
init(…)
public void 
dispose(..)

// 处理行
public void 
run()
public boolean 
processRow(..)  //步骤主要数据处理方法

Init()方法在转换执行前被kettle调用,转换必须在所有步骤初始化成功时才真正执行。dispose()方法是在步骤执行完之后执行(非转换执行完),它完成资源的关闭,像文件句柄、缓存等等。

run()方法在实际处理记录集的时候调用,其实就是个循环,由上游获取的每条记录交由processRow()方法处理,当此步骤没有数据处理或转换被停止时退出循环。

processRow()方法在处理单条记录的时候被调用。这个方法通常通过调用getRow()来获取需要处理的单条记录。这个方法如果有需要将会被阻塞,例如当此步骤希望放慢脚步处理数据时。processRow()随后的流程将执行转换工作并调用putRow()方法将处理过的记录放到它的下游步骤。 

四、数据类:

大多数步骤都需要临时的缓冲或者临时的存储。数据类就是这些数据合适的存放位置。每一个执行线程拥有的一个数据类的实例,所以它们能在独立的空间里面运行。TemplateStepData继承自BaseStepData,作为一个经验法则,不要将non-constant字段放置BaseStepData类里面,如果一定要,请将它最好放置在派生类TemplateStepData里面.

 

2.4  插件配置

在plugins\steps下新建文件夹TemplateStepPlugin,将编写好的插件源码编译打包成一个jar包,连同插件显示图片和写好的plugin.xml一起放入TemplateStepPlugin。

 

 

 

 

3. 插件注册与查找

3.1插件的注册

Spoon在启动的时候会对所有插件进行注册,并保存在PluginRegistry类里面。平台通过查找PluginRegistry注册表获取插件信息。Kettle安装插件需要进行重启,卸载插件也只需简单的删除plugins目录结构下对应的文件即可。

插件注册时序图:

 

 

   PluginRegistry首选注册本系统的插件类型处理类,源码中注册了10类型PluginRegistry.addPluginType(RowDistributionPluginType.getInstance());

     PluginRegistry.addPluginType(StepPluginType.getInstance());

     PluginRegistry.addPluginType(PartitionerPluginType.getInstance());

     PluginRegistry.addPluginType(JobEntryPluginType.getInstance());

     PluginRegistry.addPluginType(LogTablePluginType.getInstance());

     PluginRegistry.addPluginType(RepositoryPluginType.getInstance());

     PluginRegistry.addPluginType(LifecyclePluginType.getInstance());

     PluginRegistry.addPluginType(KettleLifecyclePluginType.getInstance());

     PluginRegistry.addPluginType(ImportRulePluginType.getInstance());

     PluginRegistry.addPluginType(CartePluginType.getInstance());

此处以StepPluginType为例。注册类型处理类后,PluginRegistry按照不同的类型进行插件搜索(模板模式),基类BasePluginType提供了本地搜索、jar搜索、xml信息搜索3种钩子。根据搜索结果,按照不同的插件类型存储在PluginRegistry中。

 

3.2 插件查找

    PluginRegistry提供了插件查找功能,准确的来说是插件信息的查找功能。以steps在左侧功能栏里面的显示为例,进行插件查找的说明。PluginRegistry提供了getPlugins获取指定插件类型列表、getPlugin获取指定成名插件、getCateories获取目录结构、getClass获取指定插件类等方法。

    左侧显示由Spoon.refreshCoreObjects()函数实现,如果选择时trans相关的内容,将显示所有的step插件。流程图如下所示:


 

实现代码:

if (showTrans) {

      selectionLabel.setText(BaseMessages.getString(PKG, "Spoon.Steps"));

      PluginRegistry registry = PluginRegistry.getInstance();

      final List<PluginInterface> basesteps = registry.getPlugins(StepPluginType.class);    //获取插件信息

      final List<String> basecat = registry.getCategories(StepPluginType.class);    //获取目录信息

      if( stepFilter == null )

      {

      stepFilter = new StepFilterConfigure();

      }

      //items filter...

      for (int i = 0; i <basecat.size(); i++) {       //依次添加获取到的目录

    StringtmpCatName = basecat.get(i).toLowerCase();

    if(stepFilter.getTransCategoriesFilteredList().contains(tmpCatName))

    {

        //filter categories

        continue;

    }

        TreeItem item = new TreeItem(coreObjectsTree, SWT.NONE);

        item.setText(basecat.get(i));

        item.setImage(GUIResource.getInstance().getImageArrow());

        //replace icon

        Image newIcon = null;

        String iconName = "cate_" + basecat.get(i)+ ".png";

        String path = "ui/images/";

        String iconPath = path + iconName;

        newIcon = GUIResource.getInstance().getImageByName(iconPath);

        if( newIcon != null )

        {

        item.setImage(newIcon);

        }

        for (int j = 0; j <basesteps.size(); j++) {     //依次添加获取到的该目录下的插件

          if(basesteps.get(j).getCategory().equalsIgnoreCase(basecat.get(i)) && !stepFilter.getTransStepFilteredList().contains(basesteps.get(j).getName())){

            final Image stepimg = GUIResource.getInstance().getImagesStepsSmall()

               .get(basesteps.get(j).getIds()[0]);

            String pluginName =basesteps.get(j).getName();

            String pluginDescription =basesteps.get(j).getDescription();

            if (!filterMatch(pluginName) &&!filterMatch(pluginDescription))

              continue;

            TreeItem stepItem = new TreeItem(item, SWT.NONE);

            stepItem.setImage(stepimg);

            stepItem.setText(pluginName);

            stepItem.addListener(SWT.Selection, new Listener() {

              public void handleEvent(Eventarg0) {

                //System.out.println("Tree item Listener fired");

              }

            });

            coreStepToolTipMap.put(pluginName,pluginDescription);

          }

        }

      }

    }

相关内容