本文共 9310 字,大约阅读时间需要 31 分钟。
场景说明,因电商业务高并发与实时性要求 ,需要实现多库多表数据实时同步到Elasticsearch,并实现其他消息通知功能。实现方式主要是基于业务 trigger 获取增量变更。业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。生产使用Canal版本为1.1.4,MySQL版本为8.x。
基于日志增量订阅和消费的业务包括
整个项目以商品中心为原型:打造真正意义上的大型电商商品中心,商品中心囊括了Elasticsearch,Canal,RocketMq,ShardingProxy的实战应用,后续逐步展开。
本章提供项目源码包括:search-common,sharding-event-canal,sharding-event-plugin,其他后续完善后逐节提供出来。项目功能包括:Canal数据监听拉取,分片数据解析,数据消费(建议使用RocketMq)。有关Canal的安装,。@Slf4j@NoArgsConstructor@AllArgsConstructorpublic class CanalEntryPullRunner implements Runnable { /** * Canal Destination名称 */ private String destination; /** * Canal Destination 配置 */ private CanalDestinationConf conf; /** * Canal 真实连接对象 */ private CanalConnector connector; /** * Canal Entry 事件消费者 */ private CanalEntryEventConsumer consumer; @Override public void run() { readyToStart(); while (consumer.isRunning()){ try{ connector.connect(); connector.subscribe(conf.getFilterRegexExpression()); connector.rollback(); log.info("Canal 连接成功,destination={},filter={}", destination, conf.getFilterRegexExpression()); pullMessage(); }catch (Exception e){ log.error("Canal连接错误,destination={},cause={}", destination, Throwables.getStackTraceAsString(e)); try{ Thread.sleep(conf.getRetryConnectSleepMillis()); }catch (InterruptedException ie){ log.debug(ie.getMessage(), ie); break; } try{ connector.disconnect(); }catch (CanalClientException ce){ log.warn(ce.getMessage(), ce); } } } connector.disconnect(); } /** * Pull Canal消息 * @throws Exception 异常处理 */ private void pullMessage(){ while (consumer.isRunning()) { Message message = conf.getBlockWaitMillis() <= 0 ? connector.getWithoutAck(conf.getPullBatchSize()) : connector.getWithoutAck(conf.getPullBatchSize(), conf.getBlockWaitMillis(), TimeUnit.MILLISECONDS); long batchId = message.getId(); int size = message.getEntries().size(); if (-1 == batchId || size <= 0) { try { Thread.sleep(conf.getIdleSpinMillis()); } catch (InterruptedException e) { log.debug(e.getMessage(), e); break; } connector.ack(batchId); } else { //Consumer确保异常处理,成功ack,失败rollback if (consumer.onCanalEntryEvent(message.getEntries())) { connector.ack(batchId); } else { connector.rollback(batchId); } } } } private void readyToStart(){ try { //等待5seconds Thread.sleep(5000L); }catch (InterruptedException e){ log.error(e.getMessage(), e); } }}
@Slf4jpublic final class CanalEntryParser { private static CanalShardingMapper shardingMapper = null; /** * 将Canal变更数据Entry,转换为DataEvent * @param entry 变更数据Entry * @return DataEvent */ public static DataEvent parse(CanalEntry.Entry entry){ if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA){ return null; } CanalEntry.RowChange rowChange = rowChange(entry); if (null == rowChange){ return null; } CanalEntry.Header header = entry.getHeader(); DataEvent event = new DataEvent(); event.setSource(EventSource.CANAL.name()); event.setTimestamp(header.getExecuteTime()); event.setSchema(shardingMapper().convertLogicSchemaName(header.getSchemaName())); event.setTable(shardingMapper().convertLogicTableName(header.getSchemaName(), header.getTableName())); switch (header.getEventType()){ case INSERT: event.setType(EventType.ADD); event.setRows(parseAfterRowEvent(rowChange)); return event; case UPDATE: event.setType(EventType.UPD); event.setRows(parseRowEvent(rowChange)); return event; case DELETE: event.setType(EventType.DEL); event.setRows(parseBeforeRowEvent(rowChange)); return event; default: return null; } } /** * Canal Entry变更处理 * @param entry Entry项 * @return */ private static CanalEntry.RowChange rowChange(CanalEntry.Entry entry){ CanalEntry.RowChange rowChange; try{ rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); }catch (InvalidProtocolBufferException e){ log.error(e.getMessage(), e); return null; } int rowSize = rowChange.getRowDatasCount(); if (rowChange.getIsDdl() || rowSize <= 0){ //DDL操作 return null; } return rowChange; } /** * 解析Canal变更行数据,返回行List * @param rowChange 变更行 * @return */ private static ListparseRowEvent(CanalEntry.RowChange rowChange){ List eventList = Lists.newArrayListWithExpectedSize(rowChange.getRowDatasCount()); rowChange.getRowDatasList().forEach(rowData -> { RowEvent event = new RowEvent(); event.setBefore(parseGenericMap(rowData.getBeforeColumnsList())); event.setAfter(parseGenericMap(rowData.getAfterColumnsList())); eventList.add(event); }); return eventList; } private static List parseBeforeRowEvent(CanalEntry.RowChange rowChange){ List eventList = Lists.newArrayListWithExpectedSize(rowChange.getRowDatasCount()); rowChange.getRowDatasList().forEach(rowData -> { RowEvent event = new RowEvent(); event.setBefore(parseGenericMap(rowData.getBeforeColumnsList())); eventList.add(event); }); return eventList; } private static List parseAfterRowEvent(CanalEntry.RowChange rowChange){ List eventList = Lists.newArrayListWithExpectedSize(rowChange.getRowDatasCount()); rowChange.getRowDatasList().forEach(rowData -> { RowEvent event = new RowEvent(); event.setAfter(parseGenericMap(rowData.getAfterColumnsList())); eventList.add(event); }); return eventList; } /** * 解析Table变更行数据,返回Map * @param columns 行所有列 * @return Map */ private static GenericMap parseGenericMap(List columns){ GenericMap genericMap = new GenericMap(columns.size()); columns.forEach(column -> genericMap.put(column.getName(), SqlTypeUtils.parseObject(column.getSqlType(), column.getValue()))); return genericMap; } private static CanalShardingMapper shardingMapper(){ if (null == shardingMapper){ shardingMapper = SpringBeanUtils.getBean(CanalShardingMapper.class); } return shardingMapper; }}
@Slf4jpublic class CanalEntryEventConsumer { @Autowired private CanalEntryProperties properties; @Autowired private EventMulticaster eventMulticaster; private volatile boolean running; /**拉取Canal事件数据线程池*/ private ExecutorService executorService; @PostConstruct public void start(){ String addressStr = properties.getAddress(); log.info("Canal Address:{}", addressStr); CanalAddressUtils.Protocol protocol = CanalAddressUtils.parse(addressStr); //开启运行状态 running = true; //noinspection unchecked executorService = Executors.newCachedThreadPool(new NamedThreadFactory("Canal-Event-Worker")); //检查destination配置 MapdestinationConfMap = properties.getDestinationMap(); destinationConfMap.forEach((dest, conf) -> { CanalConnector connector = protocol == CanalAddressUtils.Protocol.P2P ? CanalConnectors.newSingleConnector(CanalAddressUtils.p2pServer(addressStr), dest, properties.getUsername(), properties.getPassword()) : CanalConnectors.newClusterConnector(CanalAddressUtils.zookeeperServerAddress(addressStr), dest, properties.getUsername(), properties.getPassword()); Runnable runner = new CanalEntryPullRunner(dest, conf, connector, this); executorService.execute(runner); }); } /** * 响应处理Canal Entry Event * @param entryList 变更entry列表 * @return */ public boolean onCanalEntryEvent(List entryList) { try { log.info("Canal Entry Change Size:{}", entryList.size()); entryList.stream() .map(CanalEntryParser::parse) .filter(Objects::nonNull) .forEach(e -> eventMulticaster.publishEvent(e)); return true; } catch (Exception e) { log.error(e.getMessage(), e); return false; } } @PreDestroy public void stop(){ running = false; executorService.shutdown(); } public boolean isRunning(){ return running; }}
,。
转载地址:http://qccpi.baihongyu.com/