Azkaban使用示例,azkaban示例


1、Web端管理工作流
Azkaban提供了易操作的Web管理界面,具体操作可参考:http://azkaban.github.io/azkaban/docs/2.5/#ajax-api
值得注意的是,可以使用Azkaban提供的Web界面,定义或覆盖工作流的具体执行参数,操作界面如下:

2.1、登录认证
命令:curl -k -X POST --data "action=login&username=azkaban&password=azkaban" https://localhost:8443
返回json格式:

<span style="white-space:pre">	</span>{
  		"status" : "success",
  		"session.id" : "c001aba5-a90f-4daf-8f11-62330d034c0a"
	}

2.2、创建一个Project
命令:curl -k -X POST --data "session.id=9089beb2-576d-47e3-b040-86dbdc7f523e&name=aaaa&description=11" https://localhost:8443/manager?action=create
返回json格式:

<span style="white-space:pre">	</span>{
  		"status":"success",
  		"path":"manager?project=aaaa",
  		"action":"redirect"
	} 

2.3、上传一个Project的zip文件
命令:curl -k -i -H "Content-Type: multipart/mixed" -X POST --form 'session.id=e7a29776-5783-49d7-afa0-b0e688096b5e' --form 'ajax=upload' --form 'file=@myproject.zip;type=application/zip' --form 'project=MyProject' https://localhost:8443/manager
注意:在zip文件的目录下执行该命令,否则无法找到zip文件
返回json格式:

<span style="white-space:pre">	</span>{
  		"error" : "Installation Failed.\nError unzipping file.",
  		"projectId" : "192",
  		"version" : "1"
	}

2.4、执行工作流
命令:curl -k --get --data 'session.id=189b956b-f39f-421e-9a95-e3117e7543c9' --data 'ajax=executeFlow' --data 'project=azkaban-test-project' --data 'flow=test' https://localhost:8443/executor
返回json格式: 

<span style="white-space:pre">	</span>{
  		message: "Execution submitted successfully with exec id 295",
  		project: "foo-demo",
  		flow: "test",
  		execid: 295
	}

3、使用Ajax API操作工作流
在程序中执行工作流的话,则需要使用Azkaban提供的ajax api了,以下是使用Java模拟https请求的代码:
AzkabanHttpsPost类

package hadoop.azkaban;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.util.Properties;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;

import net.sf.json.JSONObject;

public class AzkabanHttpsPost {
        static String keystorePassword;
        static String keystore;
        static String truststore;

        static{
                InputStream is=Thread.currentThread().getContextClassLoader().
                getResourceAsStream("azkaban.properties");
                Properties p=new Properties();
                try{
                        p.load(is);
                        keystorePassword = p.getProperty("PASSWORD");
                        keystore = p.getProperty("KEYSTORE");
                        truststore = p.getProperty("TRUSTSTORE");
                }catch(Exception e){
                        e.printStackTrace();
                }
        }

        /**
         * 获得KeyStore.
         * 
         * @param storePath
         *            密钥库路径
         * @param password
         *            密码
         * @return 密钥库
         * @throws Exception
         */
        public static KeyStore getKeyStore(String password, String storePath)
                        throws Exception {
                // 实例化密钥库
                KeyStore ks = KeyStore.getInstance("JKS");
                // 获得密钥库文件流
                FileInputStream is = new FileInputStream(storePath);
                // 加载密钥库
                ks.load(is, password.toCharArray());
                // 关闭密钥库文件流
                is.close();
                return ks;
        }
         /**
         * 获得SSLSocketFactory.
         * 
         * @param password
         *            密码
         * @param keyStorePath
         *            密钥库路径
         * @param trustStorePath
         *            信任库路径
         * @return SSLSocketFactory
         * @throws Exception
         */
        public static SSLContext getSSLContext() throws Exception {
                // 实例化密钥库
                KeyManagerFactory keyManagerFactory = KeyManagerFactory
                                .getInstance(KeyManagerFactory.getDefaultAlgorithm());
                // 获得密钥库
                KeyStore keyStore = getKeyStore(AzkabanHttpsPost.keystorePassword, AzkabanHttpsPost.keystore);
                // 初始化密钥工厂
                keyManagerFactory.init(keyStore, AzkabanHttpsPost.keystorePassword.toCharArray());
                // 实例化信任库
                TrustManagerFactory trustManagerFactory = TrustManagerFactory
                                .getInstance(TrustManagerFactory.getDefaultAlgorithm());
                // 获得信任库
                KeyStore trustStore = getKeyStore(AzkabanHttpsPost.keystorePassword, AzkabanHttpsPost.truststore);
                // 初始化信任库
                trustManagerFactory.init(trustStore);
                // 实例化SSL上下文
                SSLContext ctx = SSLContext.getInstance("TLS");
                // 初始化SSL上下文
                ctx.init(keyManagerFactory.getKeyManagers(),
                                trustManagerFactory.getTrustManagers(), null);
                // 获得SSLSocketFactory
                return ctx;
        }

        /**
         * 初始化HttpsURLConnection.
         * 
         * @param password
         *            密码
         * @param keyStorePath
         *            密钥库路径
         * @param trustStorePath
         *            信任库路径
         * @throws Exception
         */
        public static void initHttpsURLConnection() throws Exception {
                // 声明SSL上下文
                SSLContext sslContext = null;
                // 实例化主机名验证接口
                HostnameVerifier hnv = new MyHostnameVerifier();
                try {
                        sslContext = getSSLContext();
                } catch (GeneralSecurityException e) {
                        e.printStackTrace();
                }
                if (sslContext != null) {
                        HttpsURLConnection.setDefaultSSLSocketFactory(sslContext
                                        .getSocketFactory());
                }
                HttpsURLConnection.setDefaultHostnameVerifier(hnv);
        }
         /**
         * 发送请求.
         * 
         * @param httpsUrl
         *            请求的地址,如https://localhost:8043
         * @param xmlStr
         *            请求的数据,如action=login&username=azkaban&password=azkaban
         * @throws Exception 
         */
        public static JSONObject post(String url,String xmlStr) throws Exception {
                initHttpsURLConnection();
                JSONObject jsonObj = null;
                HttpsURLConnection urlCon = null;
                try {
                        urlCon = (HttpsURLConnection) (new URL(url)).openConnection();
                        urlCon.setDoInput(true);
                        urlCon.setDoOutput(true);
                        urlCon.setRequestMethod("POST");
                        // 如下设置后,azkaban才能识别出是以ajax的方式访问,从而返回json格式的操作信息
                        urlCon.setRequestProperty("Content-Type",
                                        "application/x-www-form-urlencoded");
                        urlCon.setRequestProperty("X-Requested-With", "XMLHttpRequest");
                        urlCon.setUseCaches(true);
                        // 设置为gbk可以解决服务器接收时读取的数据中文乱码问题
                        urlCon.getOutputStream().write(xmlStr.getBytes("gbk"));
                        urlCon.getOutputStream().flush();
                        urlCon.getOutputStream().close();
                        BufferedReader in = new BufferedReader(new InputStreamReader(
                                        urlCon.getInputStream()));
                        String line="";
                        String temp;
                        while ((temp = in.readLine()) != null) {
                                line = line + temp;
                        }
                        jsonObj = JSONObject.fromObject(line);
                } catch (MalformedURLException e) {
                        e.printStackTrace();
                } catch (IOException e) {
                        e.printStackTrace();
                } catch (Exception e) {
                        e.printStackTrace();
                }
                return jsonObj;
        }

MyHostnameVerifier类

package hadoop.azkaban;
import javax.net.ssl.HostnameVerifier;  
import javax.net.ssl.SSLSession;  
  
/** 
 * 实现用于主机名验证的基接口。  
 * 在握手期间,如果 URL 的主机名和服务器的标识主机名不匹配,则验证机制可以回调此接口的实现程序来确定是否应该允许此连接。 
 */  
public class MyHostnameVerifier implements HostnameVerifier {  
    @Override  
    public boolean verify(String hostname, SSLSession session) {  
        if("localhost".equals(hostname)){  
            return true;  
        } else {  
            return false;  
        }  
    }  
}  

以下是调用https,操作Azkaban的示例:

package hadoop.azkaban;

import java.io.InputStream;
import java.util.Properties;

import net.sf.json.JSONObject;

/**
 * 
 * @author hu
 *
 */
public class AzkabanOperator {
        public static String url;
        public static String azkabanUser;
        public static String azkabanPassword;
        public static String GDI_Project;
        public static String GDI_Workflow;
        static {
                InputStream is = Thread.currentThread().getContextClassLoader()
                                .getResourceAsStream("azkaban.properties");
                Properties p = new Properties();
                try {
                        p.load(is);
                        url = p.getProperty("URL");
                        azkabanUser = p.getProperty("AZKABANUSER");
                        azkabanPassword = p.getProperty("AZKABANPASSWORD");
                        GDI_Project = p.getProperty("GDI_Project");
                        GDI_Workflow = p.getProperty("GDI_Workflow");
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }

        public JSONObject login() throws Exception {
                JSONObject result = null;
                String queryStr = "action=login&username=" + azkabanUser + "&password="
                                + azkabanPassword;
                result = AzkabanHttpsPost.post(url, queryStr);
                return result;
        }
         public JSONObject executeGDIFlow(String sessionID, String project,
                        String flow, String cwParams, String smParams, String gdiParams)
                        throws Exception {
                JSONObject result = null;
                String executeStr = "session.id=" + sessionID
                                + "&ajax=executeFlow&project=" + project + "&flow=" + flow
                                + "&flowOverride[cw_params]=" + cwParams
                                + "&flowOverride[sm_params]=" + smParams
                                + "&flowOverride[gdi_params]=" + gdiParams;
                String executeUrl = url + "/executor";
                result = AzkabanHttpsPost.post(executeUrl, executeStr);
                return result;
        }

        public JSONObject fetchFlow(String sessionID, String execID)
                        throws Exception {
                JSONObject result = null;
                String executeStr = "session.id=" + sessionID
                                + "&ajax=fetchexecflow&execid=" + execID;
                String executeUrl = url + "/executor";
                result = AzkabanHttpsPost.post(executeUrl, executeStr);
                return result;
        }

相关内容