百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

最简单的canal 1.1.6服务搭建方法

nanshan 2025-05-23 18:53 4 浏览 0 评论

前言:

因为在项目中集成了ElasticSearch,用于某些业务场景的搜索或筛选。这里关于ElasticSearch就不做介绍了,虽然解决了全文搜索的性能问题。但是当出现一些频繁更新的数据放置在ES就有点麻烦了。

这时候,一款能把MySQL数据即时同步到ElasticSearch的工具就显得格外重要了。经过比较筛选我选择了阿里的canal,这里应该就有人会说用Elastic全家桶的Logstash或filebeat不是更好吗!接下来就会介绍我为什么选canal以及最重要的canal的本地搭建(零基础视角)。

对比:

作为增量数据消费,应用于各种场景都有与之对应领域比较好的工具。比如上面的Logstash和filebeat虽都同属于Elastic但也截然不同,更别说flume等等工具。我选择canal的原因是他有适配器,只要是MySQL同步场景,对面是一个能储存的都可以,比如文件,队列,数据仓库,ES等等都可以。而其他的更多的应用场景可能是定时的日志采集。

场景:

1. 实时同步MySQL数据到ElasticSearch

2. redis缓存的即时更新

3. 业务上商品订阅降价等等

下载:

1. 地址:

github上搜索Canal即可

2. github上如何下载源码发行包

刚开始打开canal没看到的可能只是工具的源码,但是在windows下需要工具包。源码编译打包对于新手来说还不是时候,所以就介绍一下github等其他版本仓库别人开源的工具类代码如何下载发行包。

(1). 进入github仓库主页,一般是在右上角有一个releases超链接,点击进入就有各迭代版本的发行包介绍和下载资源了。

3. 哪个才是canal服务的工具包?

点击进去可能看到的有如下好几个包,而给我们开发语言(客户端)能提供服务的是deployer,另外几个分别是管控台和适配器等,以后有时间再介绍他们的用途和安装方法。

配置:

1. MySQL的binlog开启

因为canal通过伪装成MySQL一个slave,通过dump协议与master通讯,并解析MySQL的binlog文件。canal的工作原理和MySQL的binlog开启方法这里就不做介绍了,网上都比较多。

2. canal实例的主要的几个配置

(1). MySQL账户, conf/example/instance.properties

canal.instance.dbUsername=canal     // 数据库账户
canal.instance.dbPassword=canal     // 数据库密码

(2). 数据库新建上面账户并授权

#创建用户
CREATE USER canal IDENTIFIED BY 'canal';  
#创建权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;

(3). canal服务端口账户,conf/canal.properties

# tcp指定的IP, 不填表示0.0.0.0
canal.ip =

# register ip to zookeeper
canal.register.ip =

# canal服务端口
canal.port = 11111
canal.metrics.pull.port = 11112

# canal 服务的账号密码,注释表示客户端连接无需账号密码
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

下载完canal deployer后,如果只是作为客户端请求测试,需要配置也就是上面两个文件(conf/example/instance.properties 和 conf/canal.properties), 除了上面按照自己的配置,其他的都保持原有参数不变即可。

启动:

windows下启动,打开cmd,进入根目录下的bin,然后运行startup.bat就可以。启动成功与否可以在logs目录的两个日志文件中查看,如果有Error字眼,一般就是配置有问题,可以根据具体报错具体查找原因。

测试:

因为canal是Java开发的,所以测试也采用Java作为客户端打印一下实时解析binlog的结果。不过在跑Java程序前,windows可以通过以下两个命令查看canal启动情况。

telnet 127.0.0.1 11111
netstat -ano | findstr "11111"

接下来也依然以新手的视角(因为以前都是写PHP,所以看我文件的应该也都是PHPer,所以熟悉的同学们可以复制下面代码测试)创建Java项目,构建Jar包,编写canal客户端,编译运行等等。

1. 打开IntekkiJ IDEA, 创建一个Maven项目。

2. 打开pom.xml添加以下依赖。

<dependencies>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>
    </dependencies>

3. 打开编辑器右上角的Maven按钮,并按刷新,等待下载依赖。

4. src/main/java 新建一个 Java类文件,粘贴以下代码。

import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class CanalClientTest {


    public static void main(String args[]) {
        // 创建链接,这里就需要canal里配置的端口,账号密码,destination默认先example就行
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1",11111),
                "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;

        try {
            connector.connect();
            // 防止 deserializer failed报错
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;

            while (emptyCount < totalEmptyCount) {
                connector.subscribe();

                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();

                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }

                } else {
                    emptyCount = 0;
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");

        } finally {

            connector.disconnect();
        }

    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

}

5. 右击该文件Run, 开始编译运行,并打开运行台,然后再在数据库里随便找一张表,随便修改以下数据,就可以实时查看变动的信息了。

相关推荐

电脑cpu占用率高?怎么办?1分钟快速解决!

案例:电脑cup过高怎么办?【我的电脑运行缓慢,导致我学习和工作的效率很低。刚刚查看了一下电脑,发现它的cpu占用率很高。有没有小伙伴知道如何解决此电脑cpu过高的问题?】电脑是我们生活中不可缺少的工...

CPU使用率100%怎么办

当电脑的CPU使用率达到100%时,往往会引发一系列令人头疼的问题,如卡顿、过载、过热甚至死机。这些问题不仅严重影响了电脑的正常使用,还可能对硬件造成损害。为了有效应对这一挑战,我们可以采取一系列措施...

提高CPU利用率方法

一、背景:一般小项目服务器的虚拟机服务器CPU很难达到要求的,要求一般都是使用率达到60%-90%,除非是数据库服务器,还有计算很频繁的应用服务器,不然是大部分的都不能达到要求的,无法达到要求,就得是...

Go到Rust:代码对比揭示60% CPU使用率降低的技术路径

Go与Rust作为现代系统级编程语言,在并发处理和内存管理上采取了截然不同的设计哲学。本文通过四个典型场景的代码对比,剖析两种语言在CPU效率层面的核心差异,揭示为何部分技术团队通过语言迁移实现了60...

一招教你解决CPU占用率100%的问题 #电脑小技巧

大家好,今天讲一下CPU占用率100%的解决方法。·首先点运行,在这块输入gpedit.msc回车。·打开管理模板,Windows组件,MicrosoftDefender防病毒。·点开扫描,扫描期间...

技术丨教你降低CPU与内存占用率,让系统快如闪电

当内存和CPU都达到了较大的占用率时,很可能会导致系统崩溃。该如何解决这一问题?本期视频将指导大家:如何有效减少内存和CPU的占用率。快来看看具体操作步骤吧!1.尝试运行ePSA硬件检测首先,尝试运...

Serv00服务器搭建代理节点全流程|无需保号保活|Cloudflare隧道

注册图文教程(2024)「链接」视频教程BiliBili:Serv00服务器搭建代理节点全流程|无需保号保活|Cloudflare隧道|serv00-play脚本_哔哩哔哩_bilibiliS...

600+ 道 Java面试题及答案整理(建议收藏)

小七整理了最近几年最新、最全的Java面试题,题目涉及Java基础、集合、多线程、IO、分布式、Spring全家桶、MyBatis、Dubbo、缓存、消息队列、Linux…等等。题库共6...

网络编程 | 彻底搞懂网络 IO 模型

令人头大的IO说起网络IO相关的开发,很多人都头大,包括我自己,写了几年的代码,对IO相关的术语说起来也是头头是道,什么NIO、IO多路复用等术语一个接一个。但是也就自己知道,这些概念一...

开源全方位运维监控工具:HertzBeat

HertzBeat:实时监控系统性能,精准预警保障业务稳定-精选真开源,释放新价值。概览HertzBeat是一款深受广大开发者喜爱的开源实时监控解决方案。它以其简洁直观的设计理念和免安装Agent的...

网络安全工程师必知的75个网络端口

作为一名网络安全工程师,必须熟知网络端口,一般将端口分为以下3类:(1)公认端口(Well-KnownPorts):范围从0到1023(2)注册端口(RegisteredPorts):从1024到...

PHP技能评测

公司出了一些自我评测的PHP题目,现将题目和答案记录于此,以方便记忆。1.魔术函数有哪些,分别在什么时候调用?__construct(),类的构造函数__destruct(),类的析构函数__cal...

2020年Dubbo30道高频面试题!还在为面试烦恼赶快来看看

前言Dubbo是一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案。简单的说,dubbo就是个服务框架,如果没有分布式的需求,其实是不需要用的,只有在分布式的...

2018年度回顾:挖矿木马为什么会成为病毒木马黑产的中坚力量

一、概述根据腾讯御见威胁情报中心监测数据,2018年挖矿木马样本月产生数量在百万级别,且上半年呈现快速增长趋势,下半年上涨趋势有所减缓。由于挖矿的收益可以通过数字加密货币系统结算,使黑色产业变现链条十...

自查风险突出的30个服务高危端口

在计算机网络中,端口是一种用于区分不同网络服务或应用程序的逻辑地址。每个网络服务或应用程序都需要至少一个端口(号)来实现网络通信。当某个端口开放时,便能接收来自于其它计算机或网络设备的连接请求和数据。...

取消回复欢迎 发表评论: