Oracel 存储过程-通用ETL实现
数据规格化 处理自动化 信息集中化 操作人性化 架构
通过视图实现来至两个不同数据库的表的结构完全一致, 在结构完全相同的两个表之间进行数据同步, 问题变得相当简单. 同步代码如下. ETL
---初始同步
delete from ods_table;
insert into v_table
select * from db_table; commit;
---新增同步
insert into v_table
select * from db_table t
where not in (select id from v_table); 1
-
commit;
---变更同步
update ods_table t
set = (select from db_table db where =
where != (select from db_table db where = ; commit;
实现两个表结构完全一致的方法如下 ---建表
CREATE SEQUENCE SEQ_ETL_INCREASE_ID
INCREMENT BY 1
START WITH 1
NOCACHE;
/*==============================================================*/
/* Table: ETL_TABLES */
/*==============================================================*/
CREATE TABLE ETL_TABLES ( 2
-
\"ID\" NUMBER DEFAULT -1 NOT NULL,
\"TABLE_NAME\" VARCHAR2(100) NOT NULL,
\"TABLE_TYPE\" VARCHAR2(30) NOT NULL,
\"TABLE_ROOT_IN\" VARCHAR2(30),
\"TABLE_NEED_CREATE_VIEW\" NUMBER DEFAULT 1,
\"TABLE_CREATE_VIEW_NAME_PREFIX\" VARCHAR2(30) DEFAULT 'v',
\"DB_LINK_NAME\" VARCHAR2(100),
\"CURRENT_VERSION\" NUMBER DEFAULT 1 NOT NULL,
\"VERSION_HISTORY\" VARCHAR2(3000) DEFAULT 'init input' NOT NULL,
\"DEVELOP_DATE\" DATE DEFAULT SYSDATE NOT NULL,
\"DEVELOP_BY\" VARCHAR2(100) DEFAULT 'cyyan@isoftstone' NOT NULL,
\"LAST_MAINTAIN_DATE\" DATE DEFAULT SYSDATE NOT NULL,
\"LAST_MAINTAIN_BY\"
VARCHAR2(100)
DEFAULT
'cyyan@isoftstone' NOT NULL,
\"MEMO\" VARCHAR2(500),
\"STATUS\" NUMBER DEFAULT 1, 3
-
CONSTRAINT PK_ETL_TABLES PRIMARY KEY (\"ID\") );
COMMENT ON TABLE ETL_TABLES IS
'此表用于维护ETL涉及到所有表, 包括:
1, db---业务系统数据库
2, ods---操作数据数据库
3, dw---数据仓库';
/*==============================================================*/
/* Table: ETL_VIEWS */
/*==============================================================*/
CREATE TABLE ETL_VIEWS (
\"ID\" NUMBER DEFAULT -1 NOT NULL,
\"VIEW_NAME\" VARCHAR2(100) NOT NULL,
\"VIEW_TYPE\" VARCHAR2(30) NOT NULL,
\"VIEW_ROOT_IN\" VARCHAR2(30),
\"VIEW_SELECT\" VARCHAR2(4000) NOT NULL, 4
-
\"VIEW_FROM\" VARCHAR2(600) NOT NULL,
\"VIEW_WHERE\" VARCHAR2(2000),
\"VIEW_ORDER_BY\" VARCHAR2(600),
\"VIEW_GROUP_BY\" VARCHAR2(600),
\"VIEW_HAVING\" VARCHAR2(600),
\"VIEW_DB_LINK_NAME\" VARCHAR2(100),
\"CURRENT_VERSION\" NUMBER DEFAULT 1 NOT NULL,
\"VERSION_HISTORY\" VARCHAR2(3000) DEFAULT 'init input' NOT NULL,
\"DEVELOP_DATE\" DATE DEFAULT SYSDATE NOT NULL,
\"DEVELOP_BY\" VARCHAR2(100) DEFAULT 'cyyan@isoftstone' NOT NULL,
\"LAST_MAINTAIN_DATE\" DATE DEFAULT SYSDATE NOT NULL,
\"LAST_MAINTAIN_BY\"
VARCHAR2(100)
DEFAULT
'cyyan@isoftstone' NOT NULL,
\"MEMO\" VARCHAR2(500),
\"STATUS\" NUMBER DEFAULT 1,
CONSTRAINT PK_ETL_VIEWS PRIMARY KEY (\"ID\") ); 5
-
COMMENT ON TABLE ETL_VIEWS IS
'此表用于维护ETL涉及到所有视图, 包括:
1, v1---db表中与ods对应到视图
2, v2---ods表中与db对应到视图
3, v3---ods表中与dw对应到视图
4, v4---dw表中与ods中对应到视图';
/*==============================================================*/
/* Table: ETLS */
/*==============================================================*/
CREATE TABLE ETLS (
\"ID\" NUMBER NOT NULL,
\"ETL_NAME\" VARCHAR2(300) NOT NULL,
\"ETL_TYPE\" VARCHAR2(30) NOT NULL,
\"ETL_SRC_VIEW_OR_TABLE\" NUMBER NOT NULL,
\"ETL_DES_VIEW_OR_TABLE\" NUMBER NOT NULL,
\"ETL_INIT_ENABLE\" NUMBER(1) DEFAULT 1 NOT NULL, 6
-
\"ETL_ADD_ENABLE\" NUMBER(1) DEFAULT 1 NOT NULL,
\"ETL_CHARGE_ENABLE\" NUMBER(1) DEFAULT 1 NOT NULL,
\"CURRENT_VERSION\" NUMBER DEFAULT 1 NOT NULL,
\"VERSION_HISTORY\" VARCHAR2(3000) DEFAULT 'init input' NOT NULL,
\"DEVELOP_DATE\" DATE
\"DEVELOP_BY\" VARCHAR2(100) NOT NULL,
\"LAST_MAINTAIN_DATE\" DATE NULL,
\"LAST_MAINTAIN_BY\"
VARCHAR2(100)
'cyyan@isoftstone' NOT NULL,
\"MEMO\" VARCHAR2(500),
\"STATUS\" NUMBER
CONSTRAINT PK_ETLS PRIMARY KEY (\"ID\") );
COMMENT ON TABLE ETLS IS
'此表用于维护ETL转换时设计到源表和目的表 7
DEFAULT SYSDATE NOT NULL, DEFAULT 'cyyan@isoftstone' DEFAULT SYSDATE NOT
DEFAULT
DEFAULT 1, -
源表(或视图)--->目的表(或视图)-
(推荐全部使用视图, 视图具有更过到灵活性, 而且更统一)
整体架构是在完全相同两张表(或视图)之间进行同步处理 规范:
1, 源表(或视图)-和目的表(或视图)-完全相同
2, 目的视图必须是单表';
--存储过程
/*==============================================================*/ /* Database name: %DATABASE% */ /* DBMS name: ORACLE Version 10g */ /* Created on: 2009-2-1 23:29:27 */
/*==============================================================*/
-- INTEGRITY PACKAGE DECLARATION 8
-
CREATE OR REPLACE PACKAGE INTEGRITYPACKAGE AS PROCEDURE INITNESTLEVEL;
FUNCTION GETNESTLEVEL RETURN NUMBER; PROCEDURE NEXTNESTLEVEL; PROCEDURE PREVIOUSNESTLEVEL; END INTEGRITYPACKAGE; /
-- INTEGRITY PACKAGE DEFINITION
CREATE OR REPLACE PACKAGE BODY INTEGRITYPACKAGE AS NESTLEVEL NUMBER;
-- PROCEDURE TO INITIALIZE THE TRIGGER NEST LEVEL PROCEDURE INITNESTLEVEL IS BEGIN
NESTLEVEL := 0; END;
-- FUNCTION TO RETURN THE TRIGGER NEST LEVEL FUNCTION GETNESTLEVEL RETURN NUMBER IS BEGIN
IF NESTLEVEL IS NULL THEN NESTLEVEL := 0; END IF;
RETURN(NESTLEVEL); END;
-- PROCEDURE TO INCREASE THE TRIGGER NEST LEVEL PROCEDURE NEXTNESTLEVEL IS BEGIN 9
-
IF NESTLEVEL IS NULL THEN NESTLEVEL := 0; END IF;
NESTLEVEL := NESTLEVEL + 1; END;
-- PROCEDURE TO DECREASE THE TRIGGER NEST LEVEL PROCEDURE PREVIOUSNESTLEVEL IS BEGIN
NESTLEVEL := NESTLEVEL - 1; END;
END INTEGRITYPACKAGE; /
CREATE OR REPLACE PROCEDURE PRO_CREATE_VIEW_BY_ETL_VIEWS AS
--------------PRO_CREATE_VIEW_BY_ETL_VIEWS------------------------ -- CREATED ON 2009-2-1 BY CYYAN@ISOFTSTONE -- 功能 : 根据ETL_VIEWS中到数据生成视图
------------------------------------------------------------------------------ 10
-
VIEW_CREATE_CODE VARCHAR2(10000); --生成视图到代码
VIEW_NAME VARCHAR2(100); --视图名称
VIEW_SELECT VARCHAR2(4000); --视图的SELECT部分 VIEW_FROM VARCHAR2(300); --视图的FROM部分 VIEW_WHERE VARCHAR2(3000); --视图的WHERE部分 VIEW_ORDER_BY VARCHAR2(600); --视图的ORDER BY部分 VIEW_GROUP_BY VARCHAR2(600); --视图的GROUP BY部分 VIEW_HAVING VARCHAR2(600); --视图的HAVING部分 VIEW_DB_LINK_NAME VARCHAR2(100); --视图的DB LINK部分
ROW_COUNT NUMBER; --行数
CURSOR ETL_VIEWS_CURSOR IS --提取创建视图需要到信息
SELECT
VIEW_NAME,
VIEW_SELECT,
VIEW_FROM,
VIEW_WHERE,
VIEW_ORDER_BY, VIEW_GROUP_BY, VIEW_HAVING, VIEW_DB_LINK_NAME FROM ETL_VIEWS T WHERE = (SELECT MAX FROM ETL_VIEWS T2 WHERE = ; BEGIN -- 统计行数
SELECT COUNT(*) INTO ROW_COUNT FROM ETL_VIEWS T WHERE = (SELECT MAX FROM ETL_VIEWS T2 WHERE = ;
OPEN ETL_VIEWS_CURSOR; --打开游标 FOR I IN 1 .. ROW_COUNT LOOP --遍历 FETCH ETL_VIEWS_CURSOR
INTO
VIEW_NAME,
VIEW_SELECT,
VIEW_FROM,
VIEW_WHERE,
VIEW_ORDER_BY, VIEW_GROUP_BY, VIEW_HAVING, VIEW_DB_LINK_NAME; 11
-
---拼接创建视图到语句
VIEW_CREATE_CODE := 'create or replace view ' || VIEW_NAME || ' as select ' || VIEW_SELECT || ' from ' || VIEW_FROM;
IF VIEW_DB_LINK_NAME IS NOT NULL THEN
VIEW_CREATE_CODE
:=
VIEW_CREATE_CODE
||
'@'
||
VIEW_DB_LINK_NAME; END IF;
IF VIEW_WHERE IS NOT NULL THEN
VIEW_CREATE_CODE := VIEW_CREATE_CODE || ' where ' || VIEW_WHERE; END IF;
IF VIEW_ORDER_BY IS NOT NULL THEN
VIEW_CREATE_CODE := VIEW_CREATE_CODE || ' order by ' || VIEW_ORDER_BY; END IF;
IF VIEW_GROUP_BY IS NOT NULL THEN
VIEW_CREATE_CODE := VIEW_CREATE_CODE || ' group by ' || VIEW_GROUP_BY; END IF;
IF VIEW_HAVING IS NOT NULL THEN
VIEW_CREATE_CODE := VIEW_CREATE_CODE || ' having ' || VIEW_HAVING; END IF; 12
-
--输出创建语句
(VIEW_CREATE_CODE); ('');
--执行创建视图
EXECUTE IMMEDIATE VIEW_CREATE_CODE; END LOOP;
CLOSE ETL_VIEWS_CURSOR; --关闭游标 END; /
CREATE OR REPLACE PROCEDURE PRO_INSERT_INTO_ETL_VIEWS AS
--ADD BY CYYAN@ISOFTSTONE --2009年2月1日21:33:37
---此存储过程用于 将ETL_TABLE中标识需要创建VIEW 到TABLE, 进行自动提起转换到ETL_VIEWS中.
--处理过程用到啦系统表COL从此表中获取列名
TABLE_NAME VARCHAR2(100); --表名 COL_NAME VARCHAR2(100); --列名
TABLE_COUNT NUMBER; --表到行数 --COL_COUNT NUMBER; --列数
ETL_VIEWS_INSERT_CODE VARCHAR2(600); --插入语句到 INSERT部分 ETL_VIEWS_VALUES_CODE VARCHAR2(16000); --插入语句到VALUES部分 13
-
--ETL_VIEWS的到列
VIEW_NAME_PREFIX VARCHAR2(30);--实体名到前缀 TABLE_TYPE VARCHAR2(30); --表类型 如 DB, ODS, DW
TABLE_ROOT_IN VARCHAR2(30); --表来源, 来自那个系统, 如资金系统\"NHZJ\财务系统\"NHCW\"
VIEW_SELECT VARCHAR2(10000); --VIEW 语句到SELECT部分, 这个需要遍历一个表到所有列
DB_LINK_NAME VARCHAR2(100);
CURRENT_VERSION VARCHAR2(600); --版本部分, 这里没更新, 只要全部删除, 或不断插入, 此字段定义了版本, 没有变更都形成新到版本, 取值是取最大值
CURSOR_NUMBER NUMBER; COL_SELECT_SQL VARCHAR2(100); RETURN_VALUE NUMBER;
--从ETL_TABLES中查询需要生成视图到表 CURSOR DB_TABLES_CURSOR IS
SELECT UPPER(TABLE_NAME), , , , DB_LINK_NAME FROM ETL_TABLES T WHERE (UPPER = 'DB' OR UPPER = 'DW' ) AND = 1;
--CURSOR_NUMBER NUMBER; --游标 OLD WAY 执行需要, NEW WAY 不需要 --RETURN_VALUE NUMBER; --执行后返回值 OLD WAY 执行需要, NEW WAY 不需要 BEGIN
-- TEST STATEMENTS HERE
SELECT COUNT(*) INTO TABLE_COUNT FROM ETL_TABLES T WHERE (UPPER = 'DB' OR UPPER = 'DW' ) AND = 1;
--构造INSERT部分
ETL_VIEWS_INSERT_CODE :=
'insert
into
etl_views(view_name,
view_type,
view_root_in, view_select, view_from, current_version, VIEW_DB_LINK_NAME) '; 14
-
OPEN DB_TABLES_CURSOR;
FOR I IN 1 .. TABLE_COUNT LOOP --表遍历 FETCH DB_TABLES_CURSOR
INTO TABLE_NAME, TABLE_TYPE, TABLE_ROOT_IN, VIEW_NAME_PREFIX, DB_LINK_NAME; --构造VALUES部分
ETL_VIEWS_VALUES_CODE :=
'values('''
||
VIEW_NAME_PREFIX
||
TABLE_NAME || ''', ''' || TABLE_TYPE || ''', ''' || TABLE_ROOT_IN || '''';
(TABLE_NAME);
/* 使用CURSOR遍历列到方法 不适用于DB_LINK --准备遍历列
SELECT COUNT(*) INTO COL_COUNT FROM COL@DB_LINK_NHZJ WHERE = UPPER(TABLE_NAME);
(' table has ' || COL_COUNT || ' cols'); DECLARE
CURSOR COLS_CURSOR IS
SELECT FROM COL@DB_LINK_NHZJ C WHERE = UPPER(TABLE_NAME);
BEGIN
OPEN COLS_CURSOR; VIEW_SELECT := '';
--下面用逗号拼接列
FETCH COLS_CURSOR --遍历第一列 INTO COL_NAME;
VIEW_SELECT := VIEW_SELECT || COL_NAME; 15
-
FOR J IN 2 .. COL_COUNT LOOP --遍历后面到列 FETCH COLS_CURSOR INTO COL_NAME; (' ' || COL_NAME);
VIEW_SELECT := VIEW_SELECT || ', ' || COL_NAME; END LOOP;
CLOSE COLS_CURSOR; END;
(VIEW_SELECT);
-- (ETL_VIEWS_VALUES_CODE); */
/* 使用 DBMS_SQL */
-- ANOTHER WAY USER DBMS_SQL PACKAGE
COL_SELECT_SQL := 'select from @' || DB_LINK_NAME || ' T where TABLE_NAME || '''';
--SQL_CODE := 'select from T where = ''' || TABLE_NAME || '''';
CURSOR_NUMBER := ();
(CURSOR_NUMBER, COL_SELECT_SQL, ;
(CURSOR_NUMBER,1,COL_NAME, 100);
RETURN_VALUE := (CURSOR_NUMBER); 16
= ''' || -
(' RETURN_VALUE = ' || RETURN_VALUE);
RETURN_VALUE := (CURSOR_NUMBER); --获取第一列 (CURSOR_NUMBER,1,COL_NAME); VIEW_SELECT := COL_NAME;
WHILE (CURSOR_NUMBER)<>0 LOOP ---遍历其它到列
(CURSOR_NUMBER,1,COL_NAME); (COL_NAME);
VIEW_SELECT := VIEW_SELECT || ', ' || COL_NAME; END LOOP;
-- ('VIEW_SELECT : ' || VIEW_SELECT);
(CURSOR_NUMBER);
--生成最新到版本号: 视图名称是唯一的
SELECT NVL(MAX(CURRENT_VERSION),0) + 1 INTO CURRENT_VERSION FROM ETL_VIEWS V WHERE = VIEW_NAME_PREFIX || TABLE_NAME;
ETL_VIEWS_VALUES_CODE := ETL_VIEWS_VALUES_CODE || CHR(10) || ', ''' || VIEW_SELECT || '''' || CHR(10) || ', ''' || TABLE_NAME || ''', ''' || CURRENT_VERSION || ''', ''' || DB_LINK_NAME || ''')';
--输出插入到语句
(ETL_VIEWS_INSERT_CODE); (ETL_VIEWS_VALUES_CODE); (''); 17
-
--DBMS_STANDARD.
--执行插入语句 -- NEW WAY
EXECUTE
IMMEDIATE
ETL_VIEWS_INSERT_CODE
||
ETL_VIEWS_VALUES_CODE; /*
-- OLD WAY
CURSOR_NUMBER := ();
(CURSOR_NUMBER, ETL_VIEWS_INSERT_CODE, ; RETURN_VALUE := (CURSOR_NUMBER); (CURSOR_NUMBER); */
END LOOP; COMMIT; --提交
CLOSE DB_TABLES_CURSOR; --关闭游标 --EXCEPTION --ROLLBACK; END; /
CREATE OR REPLACE TRIGGER TRG_ID_ON_ETLS BEFORE INSERT ON ETLS FOR EACH ROW DECLARE -- NOTHING BEGIN
SELECT INTO : FROM DUAL; END TRIGGER_ID_INCREASE; / 18
-
CREATE OR REPLACE TRIGGER TRG_ID_ON_ETL_TABLES BEFORE INSERT ON ETL_TABLES FOR EACH ROW DECLARE -- NOTHING BEGIN
SELECT INTO : FROM DUAL; END TRIGGER_ID_INCREASE; /
CREATE OR REPLACE TRIGGER TRG_ID_ON_ETL_VIEWS BEFORE INSERT ON ETL_VIEWS FOR EACH ROW DECLARE -- NOTHING BEGIN
SELECT INTO : FROM DUAL; END TRIGGER_ID_INCREASE; /
19
因篇幅问题不能全部显示,请点此查看更多更全内容