博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【商城】canal数据库数据实时同步利器-代码实战干货
阅读量:4119 次
发布时间:2019-05-25

本文共 9310 字,大约阅读时间需要 31 分钟。

场景说明,因电商业务高并发与实时性要求 ,需要实现多库多表数据实时同步到Elasticsearch,并实现其他消息通知功能。实现方式主要是基于业务 trigger 获取增量变更。业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。生产使用Canal版本为1.1.4,MySQL版本为8.x。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理
    Canal实战场景应用

1、项目说明

整个项目以商品中心为原型:打造真正意义上的大型电商商品中心,商品中心囊括了Elasticsearch,Canal,RocketMq,ShardingProxy的实战应用,后续逐步展开。

项目说明
项目说明
本章提供项目源码包括:search-common,sharding-event-canal,sharding-event-plugin,其他后续完善后逐节提供出来。项目功能包括:Canal数据监听拉取,分片数据解析,数据消费(建议使用RocketMq)。有关Canal的安装,。

2、Canal事件监听,数据拉取

@Slf4j@NoArgsConstructor@AllArgsConstructorpublic class CanalEntryPullRunner implements Runnable {    /**     * Canal Destination名称     */    private String destination;    /**     * Canal Destination 配置     */    private CanalDestinationConf conf;    /**     * Canal 真实连接对象     */    private CanalConnector connector;    /**     * Canal Entry 事件消费者     */    private CanalEntryEventConsumer consumer;    @Override    public void run() {        readyToStart();        while (consumer.isRunning()){            try{                connector.connect();                connector.subscribe(conf.getFilterRegexExpression());                connector.rollback();                log.info("Canal 连接成功,destination={},filter={}", destination, conf.getFilterRegexExpression());                pullMessage();            }catch (Exception e){                log.error("Canal连接错误,destination={},cause={}", destination, Throwables.getStackTraceAsString(e));                try{                    Thread.sleep(conf.getRetryConnectSleepMillis());                }catch (InterruptedException ie){                    log.debug(ie.getMessage(), ie);                    break;                }                try{                    connector.disconnect();                }catch (CanalClientException ce){                    log.warn(ce.getMessage(), ce);                }            }        }        connector.disconnect();    }    /**     * Pull Canal消息     * @throws Exception 异常处理     */    private void pullMessage(){        while (consumer.isRunning()) {            Message message = conf.getBlockWaitMillis() <= 0 ?                    connector.getWithoutAck(conf.getPullBatchSize()) :                    connector.getWithoutAck(conf.getPullBatchSize(), conf.getBlockWaitMillis(), TimeUnit.MILLISECONDS);            long batchId = message.getId();            int size = message.getEntries().size();            if (-1 == batchId || size <= 0) {                try {                    Thread.sleep(conf.getIdleSpinMillis());                } catch (InterruptedException e) {                    log.debug(e.getMessage(), e);                    break;                }                connector.ack(batchId);            } else {                //Consumer确保异常处理,成功ack,失败rollback                if (consumer.onCanalEntryEvent(message.getEntries())) {                    connector.ack(batchId);                } else {                    connector.rollback(batchId);                }            }        }    }    private void readyToStart(){        try {            //等待5seconds            Thread.sleep(5000L);        }catch (InterruptedException e){            log.error(e.getMessage(), e);        }    }}

3、Canal事件数据解析

@Slf4jpublic final class CanalEntryParser {    private static CanalShardingMapper shardingMapper = null;    /**     * 将Canal变更数据Entry,转换为DataEvent     * @param entry 变更数据Entry     * @return DataEvent     */    public static DataEvent parse(CanalEntry.Entry entry){        if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA){            return null;        }        CanalEntry.RowChange rowChange = rowChange(entry);        if (null == rowChange){            return null;        }        CanalEntry.Header header = entry.getHeader();        DataEvent event = new DataEvent();        event.setSource(EventSource.CANAL.name());        event.setTimestamp(header.getExecuteTime());        event.setSchema(shardingMapper().convertLogicSchemaName(header.getSchemaName()));        event.setTable(shardingMapper().convertLogicTableName(header.getSchemaName(), header.getTableName()));        switch (header.getEventType()){            case INSERT:                event.setType(EventType.ADD);                event.setRows(parseAfterRowEvent(rowChange));                return event;            case UPDATE:                event.setType(EventType.UPD);                event.setRows(parseRowEvent(rowChange));                return event;            case DELETE:                event.setType(EventType.DEL);                event.setRows(parseBeforeRowEvent(rowChange));                return event;            default:                return null;        }    }    /**     * Canal Entry变更处理     * @param entry Entry项     * @return     */    private static CanalEntry.RowChange rowChange(CanalEntry.Entry entry){        CanalEntry.RowChange rowChange;        try{            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());        }catch (InvalidProtocolBufferException e){            log.error(e.getMessage(), e);            return null;        }        int rowSize = rowChange.getRowDatasCount();        if (rowChange.getIsDdl() || rowSize <= 0){            //DDL操作            return null;        }        return rowChange;    }    /**     * 解析Canal变更行数据,返回行List     * @param rowChange 变更行     * @return     */    private static List
parseRowEvent(CanalEntry.RowChange rowChange){ List
eventList = Lists.newArrayListWithExpectedSize(rowChange.getRowDatasCount()); rowChange.getRowDatasList().forEach(rowData -> { RowEvent event = new RowEvent(); event.setBefore(parseGenericMap(rowData.getBeforeColumnsList())); event.setAfter(parseGenericMap(rowData.getAfterColumnsList())); eventList.add(event); }); return eventList; } private static List
parseBeforeRowEvent(CanalEntry.RowChange rowChange){ List
eventList = Lists.newArrayListWithExpectedSize(rowChange.getRowDatasCount()); rowChange.getRowDatasList().forEach(rowData -> { RowEvent event = new RowEvent(); event.setBefore(parseGenericMap(rowData.getBeforeColumnsList())); eventList.add(event); }); return eventList; } private static List
parseAfterRowEvent(CanalEntry.RowChange rowChange){ List
eventList = Lists.newArrayListWithExpectedSize(rowChange.getRowDatasCount()); rowChange.getRowDatasList().forEach(rowData -> { RowEvent event = new RowEvent(); event.setAfter(parseGenericMap(rowData.getAfterColumnsList())); eventList.add(event); }); return eventList; } /** * 解析Table变更行数据,返回Map
* @param columns 行所有列 * @return Map
*/ private static GenericMap parseGenericMap(List
columns){ GenericMap genericMap = new GenericMap(columns.size()); columns.forEach(column -> genericMap.put(column.getName(), SqlTypeUtils.parseObject(column.getSqlType(), column.getValue()))); return genericMap; } private static CanalShardingMapper shardingMapper(){ if (null == shardingMapper){ shardingMapper = SpringBeanUtils.getBean(CanalShardingMapper.class); } return shardingMapper; }}

4、Canal数据消费(支持MQ)

@Slf4jpublic class CanalEntryEventConsumer {    @Autowired    private CanalEntryProperties properties;    @Autowired    private EventMulticaster eventMulticaster;    private volatile boolean running;    /**拉取Canal事件数据线程池*/    private ExecutorService executorService;    @PostConstruct    public void start(){        String addressStr = properties.getAddress();        log.info("Canal Address:{}", addressStr);        CanalAddressUtils.Protocol protocol = CanalAddressUtils.parse(addressStr);        //开启运行状态        running = true;        //noinspection unchecked        executorService = Executors.newCachedThreadPool(new NamedThreadFactory("Canal-Event-Worker"));        //检查destination配置        Map
destinationConfMap = properties.getDestinationMap(); destinationConfMap.forEach((dest, conf) -> { CanalConnector connector = protocol == CanalAddressUtils.Protocol.P2P ? CanalConnectors.newSingleConnector(CanalAddressUtils.p2pServer(addressStr), dest, properties.getUsername(), properties.getPassword()) : CanalConnectors.newClusterConnector(CanalAddressUtils.zookeeperServerAddress(addressStr), dest, properties.getUsername(), properties.getPassword()); Runnable runner = new CanalEntryPullRunner(dest, conf, connector, this); executorService.execute(runner); }); } /** * 响应处理Canal Entry Event * @param entryList 变更entry列表 * @return */ public boolean onCanalEntryEvent(List
entryList) { try { log.info("Canal Entry Change Size:{}", entryList.size()); entryList.stream() .map(CanalEntryParser::parse) .filter(Objects::nonNull) .forEach(e -> eventMulticaster.publishEvent(e)); return true; } catch (Exception e) { log.error(e.getMessage(), e); return false; } } @PreDestroy public void stop(){ running = false; executorService.shutdown(); } public boolean isRunning(){ return running; }}

5、项目下载地址

,。

转载地址:http://qccpi.baihongyu.com/

你可能感兴趣的文章
《融入动画技术的交互应用》主题博文推荐
查看>>
链睿和家乐福合作推出下一代零售业隐私保护技术
查看>>
Unifrax宣布新建SiFAB™生产线
查看>>
艾默生纪念谷轮™在空调和制冷领域的百年创新成就
查看>>
NEXO代币持有者获得20,428,359.89美元股息
查看>>
Piper Sandler为EverArc收购Perimeter Solutions提供咨询服务
查看>>
RMRK筹集600万美元,用于在Polkadot上建立先进的NFT系统标准
查看>>
JavaSE_day12 集合
查看>>
JavaSE_day14 集合中的Map集合_键值映射关系
查看>>
Day_15JavaSE 异常
查看>>
异常 Java学习Day_15
查看>>
JavaSE_day_03 方法
查看>>
day-03JavaSE_循环
查看>>
Mysql初始化的命令
查看>>
day_21_0817_Mysql
查看>>
day-22 mysql_SQL 结构化查询语言
查看>>
MySQL关键字的些许问题
查看>>
浅谈HTML
查看>>
css基础
查看>>
HTML&CSS进阶
查看>>