Databus Relays

概述

Databus Relays主要负责以下两个工作:

  • 从databus源数据库中读取变化行,并序列化为事件流保存至内存中;
  • 接受客户端的请求,并将数据变化事件流返回给客户端。

技术架构

  • Event Producer:用来读取数据库的变化事件,转化为AVRO类型并存储至内存中;
  • Circular Buffer:Relay有一个或多个环形的缓冲池用来保存按递增的系统变化号(SCN) 为顺序的变化事件;
  • SCN Writer/Reader:用来读取和写入SCN号至硬盘;
  • RESTFUL interface:它暴露一个restful接口,用来推送数据变化事件至客户端。

数据抓取

目前支持Oracle和Mysql两种数据源的抓取。

Oracle数据抓取

抓取Oracle数据是通过给源表添加一个触发器,在新增和修改的时候记录SCN号作为查询的依据,通过relay定期的查询获取变化的数据。删除和查询不受影响。

Oracle数据库配置

首次部署databus,主要有以下几个步骤(非首次部署,只需要从步骤3开始):

  1. 创建databus表空间、用户、赋权限
    createUser.sh
    -- 创建databus表空间,databus用户,并给databus付权限(参见createUser.sql),注意需要指定datafile,如果表空间名字修改了,则需要修改tablespace文件
    create
    tablespace TBS_DATABUS datafile
    '${DBDIR}/tbs_databus_01.dbf'
    size
    50M reuse autoextend
    on
    next
    50M maxsize unlimited extent management
    local
    uniform
    size
    2M;
    create
    tablespace TBS_DATABUS_IDX datafile
    '${DBDIR}/tbs_databus_idx_01.dbf'
    size
    50M reuse autoextend
    on
    next
    50M maxsize unlimited extent management
    local
    uniform
    size
    2M;
    create
    user
    DATABUS identified
    by
    DATABUS
    default
    tablespace TBS_DATABUS
    temporary
    tablespace temp1;
    grant
    create
    session,
    create
    table
    ,
    create
    view
    ,
    create
    sequence
    ,
    create
    procedure
    ,
    create
    trigger
    ,
    create
    type,
    create
    job
    to
    DATABUS;
    grant
    query rewrite
    to
    DATABUS;
    grant
    execute
    on
    dbms_alert
    to
    DATABUS;
    grant
    execute
    on
    sys.dbms_lock
    to
    DATABUS;
    grant
    select
    on
    sys.v_$
    database
    to
    DATABUS;
    grant
    execute
    on
    sys.dbms_aq
    to
    DATABUS;
    grant
    execute
    on
    sys.dbms_aqadm
    to
    DATABUS;
    grant
    execute
    on
    sys.dbms_aqin
    to
    DATABUS;
    grant
    execute
    on
    sys.dbms_aq_bqview
    to
    DATABUS;
    alter
    user
    DATABUS quota unlimited
    on
    TBS_DATABUS;
    alter
    user
    DATABUS quota unlimited
    on
    TBS_DATABUS_IDX;

  2. DB | perl -lane '{my a = _; a =~ s/([^\/]*)\/.*/1/; print DB | perl -lane '{my a = _; a =~ s/[^\/]*\/([^@]*)\@.*/1/; print DB | perl -lane '{my a = _; a =~ s/[^\/]*\/[^@]*\@(.*)/1/; print t | perl -lane '{ a = _; a =~ s/.*\/(.*)\.tab/1/; print a; }' ` # echo "Creating Table t " {password} {tbs_lc}" # sqlplus {DB} << __EOF__ # @{user} {password} t | perl -lane '{ a = _; a =~ s/.*\/(.*)\.tab/1/; print a; }' ` echo "Setting up Alerts for t | perl -lane '{ a = _; a =~ s/.*\/(.*)\.view/1/; print {view}" ; # wc=`grep -ic "wc -lt 1 ]; # then # echo "View names should start with sy\{DB} << __EOF__ # @{user} {password} t | perl -lane '{ a = _; a =~ s/.*\/(.*)\.tab/1/; print t | perl -lane '{ a = _; a =~ s/.*\/(.*)\.tab/1/; print t | perl -lane '{ a = _; a =~ s/.*\/(.*)\.tab/1/; print
  3. 修改表结构,增加一列 TXN NUMBER(posp_boss)
    person.sql
    -- 创建表,添加TXN列
    create
    table
    posp_boss.person
    (

    id number
    primary
    key
    ,

    first_name
    varchar
    (120)
    not
    null
    ,

    last_name
    varchar
    (120)
    not
    null
    ,

    birth_date
    date
    ,

    deleted
    varchar
    (5)
    default
    'false'
    not
    null
    ,

    txn number
    );

  4. 将源表权限赋给databus(posp_boss)
    grant.sql
    grant
    insert
    ,
    update
    ,
    select
    on
    posp_boss.PERSON
    to
    databus;

  5. 给posp_boss赋databus.sync_core包的执行权限(databus)
    grant.sql
    grant
    execute
    on
    databus.sync_core
    to
    posp_boss;

  6. 创建索引(posp_boss)
    index.sql
    -- 创建索引(posp_boss)
    create
    index
    posp_boss.PERSON_txn
    on
    POSP_BOSS.PERSON(txn) tablespace index_data;

  7. 创建表视图,注意一定要把TXN列包括进去,并且要把ID映射为KEY
    book_vw.sql
    -- 主键ID 映射为 KEY
    CREATE
    OR
    REPLACE
    FORCE
    VIEW
    sy$person
    AS
    SELECT

    txn,

    id
    key
    ,

    first_name,

    last_name,

    birth_date,

    deleted
    FROM

    posp_boss.person;

  8. 新增sy$sources表配置,注意value的值必须小于等于125
    insert.sql
    -- 注意sourceName区分大小写,对应到上面触发器里面的sync_core.getTxn('POSP_BOSS.PERSON')的POSP_BOSS.PERSON
    insert
    into
    databus.sy$sources
    values
    (
    'POSP_BOSS.PERSON'
    ,1);

  9. 创建触发器
    trigger.sql
    -- 注意和sy$sources插入的值一致
    CREATE TRIGGER PERSON_TRG

    before insert or update on POSP_BOSS.PERSON

    referencing old as old
    new
    as
    new

    for
    each row
    begin

    if
    (updating and :
    new
    .txn <
    0
    ) then

    :
    new
    .txn := -:
    new
    .txn;

    else

    :
    new
    .txn := databus.sync_core.getTxn(
    'POSP_BOSS.PERSON'
    );

    end
    if
    ;
    end;

    至此,针对于Oracle的数据抓取数据端的配置就全部配置完毕了。

Mysql数据抓取

Mysql的数据抓取比较简单

  • 创建一个slave的帐号,因为binlog日志分析是基于主从复制的模式来实现的
  • 开启Mysql的binlog日志,设置日志名称,这个名称是后面需要用到的,默认mysql-bin,注意,binlog日志默认是不开启的,开启后需要重启mysql服务
  • 设置binlog日志格式为ROW,默认是STATEMENT。binlog_format = ROW ,只有ROW模式才会记录受影响的行数,Databus默认只获取影响行数的事件
    my.cnf
    server-id =
    1
    log_bin = mysql-bin
    expire_logs_days =
    10
    max_binlog_size = 100M
    binlog_format = ROW

  • 配置数据源,注意sources的id必须与sy$sources中的value一致
    sources.json
    {

    "name"
    :
    "boss"
    ,

    "id"
    :
    1
    ,

    "uri"
    :
    "mysql://repl%2F123456@localhost:3306/1/mysql-bin"
    ,

    "slowSourceQueryThreshold"
    :
    2000
    ,

    "sources"
    :

    [

    {

    "id"
    :
    1
    ,

    "name"
    :
    "com.linkedin.events.example.person.Person"
    ,

    "uri"
    :
    "lijiang.person"
    ,

    "partitionFunction"
    :
    "constant:1"

    }

    ]
    }

    uri的格式为:mysql://用户%2F密码@host:port/serverID/binlog文件名称
    另外需要注意sources里对应数据源的uri,必需带上数据库名称,格式为 db.table

  • 对于Mysql的数据抓取,很多数据类型在Avro序列化时会被转换为string

部署normal_replay

  1. 配置relay sources,sources的id必须与sy$sources的value一致。注意oracle和mysql的配置是不一样的。
    source.json
    # oracle
    {

    "name"
    :
    "person"
    ,

    "id"
    :
    1
    ,

    "uri"
    :
    "jdbc:oracle:thin:lijiang/lijiang@192.168.16.239:51521:afc1"
    ,

    "slowSourceQueryThreshold"
    :
    2000
    ,

    "sources"
    :

    [

    {

    "id"
    :
    1
    ,

    "name"
    :
    "com.linkedin.events.example.person.Person"
    ,

    "uri"
    :
    "lijiang.person"
    ,

    "partitionFunction"
    :
    "constant:1"

    }

    ]
    }

    # mysql
    {

    "name"
    :
    "boss"
    ,

    "id"
    :
    1
    ,

    "uri"
    :
    "mysql://repl%2F123456@localhost:3306/1/mysql-bin"
    ,

    "slowSourceQueryThreshold"
    :
    2000
    ,

    "sources"
    :

    [

    {

    "id"
    :
    1
    ,

    "name"
    :
    "com.linkedin.events.example.person.Person"
    ,

    "uri"
    :
    "lijiang.person"
    ,

    "partitionFunction"
    :
    "constant:1"

    }

    ]
    }

  2. 添加 avro 配置文件至schemas_registry文件夹中,关于avro的详细结束参见Apache Avro
    book.avsc
    {

    "name"
    :
    "Person_V1"
    ,

    "doc"
    :
    "Auto-generated Avro schema for sy$person. Generated at Dec 04, 2012 05:07:05 PM PST"
    ,

    "type"
    :
    "record"
    ,

    "meta"
    :
    "dbFieldName=sy$person;pk=key;"
    ,

    "namespace"
    :
    "com.linkedin.events.example.person"
    ,

    "fields"
    : [ {

    "name"
    :
    "txn"
    ,

    "type"
    : [
    "long"
    ,
    "null"
    ],

    "meta"
    :
    "dbFieldName=TXN;dbFieldPosition=0;"

    }, {

    "name"
    :
    "key"
    ,

    "type"
    : [
    "long"
    ,
    "null"
    ],

    "meta"
    :
    "dbFieldName=KEY;dbFieldPosition=1;"

    }, {

    "name"
    :
    "firstName"
    ,

    "type"
    : [
    "string"
    ,
    "null"
    ],

    "meta"
    :
    "dbFieldName=FIRST_NAME;dbFieldPosition=2;"

    }, {

    "name"
    :
    "lastName"
    ,

    "type"
    : [
    "string"
    ,
    "null"
    ],

    "meta"
    :
    "dbFieldName=LAST_NAME;dbFieldPosition=3;"

    }, {

    "name"
    :
    "birthDate"
    ,

    "type"
    : [
    "long"
    ,
    "null"
    ],

    "meta"
    :
    "dbFieldName=BIRTH_DATE;dbFieldPosition=4;"

    }, {

    "name"
    :
    "deleted"
    ,

    "type"
    : [
    "string"
    ,
    "null"
    ],

    "meta"
    :
    "dbFieldName=DELETED;dbFieldPosition=5;"

    } ]
    }

  3. 启动relay
    starup.sh
    ./bin/startup.sh relay

至此,Relay和数据库都已经配置和部署完成!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/181396.html原文链接:https://javaforall.cn

未经允许不得转载:木盒主机 » Databus Relays

赞 (0)

相关推荐

    暂无内容!