最简单的canal 1.1.6服务搭建方法
nanshan 2025-05-23 18:53 27 浏览 0 评论
前言:
因为在项目中集成了ElasticSearch,用于某些业务场景的搜索或筛选。这里关于ElasticSearch就不做介绍了,虽然解决了全文搜索的性能问题。但是当出现一些频繁更新的数据放置在ES就有点麻烦了。
这时候,一款能把MySQL数据即时同步到ElasticSearch的工具就显得格外重要了。经过比较筛选我选择了阿里的canal,这里应该就有人会说用Elastic全家桶的Logstash或filebeat不是更好吗!接下来就会介绍我为什么选canal以及最重要的canal的本地搭建(零基础视角)。
对比:
作为增量数据消费,应用于各种场景都有与之对应领域比较好的工具。比如上面的Logstash和filebeat虽都同属于Elastic但也截然不同,更别说flume等等工具。我选择canal的原因是他有适配器,只要是MySQL同步场景,对面是一个能储存的都可以,比如文件,队列,数据仓库,ES等等都可以。而其他的更多的应用场景可能是定时的日志采集。
场景:
1. 实时同步MySQL数据到ElasticSearch
2. redis缓存的即时更新
3. 业务上商品订阅降价等等
下载:
1. 地址:
github上搜索Canal即可
2. github上如何下载源码发行包
刚开始打开canal没看到的可能只是工具的源码,但是在windows下需要工具包。源码编译打包对于新手来说还不是时候,所以就介绍一下github等其他版本仓库别人开源的工具类代码如何下载发行包。
(1). 进入github仓库主页,一般是在右上角有一个releases超链接,点击进入就有各迭代版本的发行包介绍和下载资源了。
3. 哪个才是canal服务的工具包?
点击进去可能看到的有如下好几个包,而给我们开发语言(客户端)能提供服务的是deployer,另外几个分别是管控台和适配器等,以后有时间再介绍他们的用途和安装方法。
配置:
1. MySQL的binlog开启
因为canal通过伪装成MySQL一个slave,通过dump协议与master通讯,并解析MySQL的binlog文件。canal的工作原理和MySQL的binlog开启方法这里就不做介绍了,网上都比较多。
2. canal实例的主要的几个配置
(1). MySQL账户, conf/example/instance.properties
canal.instance.dbUsername=canal // 数据库账户
canal.instance.dbPassword=canal // 数据库密码
(2). 数据库新建上面账户并授权
#创建用户
CREATE USER canal IDENTIFIED BY 'canal';
#创建权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
(3). canal服务端口账户,conf/canal.properties
# tcp指定的IP, 不填表示0.0.0.0
canal.ip =
# register ip to zookeeper
canal.register.ip =
# canal服务端口
canal.port = 11111
canal.metrics.pull.port = 11112
# canal 服务的账号密码,注释表示客户端连接无需账号密码
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
下载完canal deployer后,如果只是作为客户端请求测试,需要配置也就是上面两个文件(conf/example/instance.properties 和 conf/canal.properties), 除了上面按照自己的配置,其他的都保持原有参数不变即可。
启动:
windows下启动,打开cmd,进入根目录下的bin,然后运行startup.bat就可以。启动成功与否可以在logs目录的两个日志文件中查看,如果有Error字眼,一般就是配置有问题,可以根据具体报错具体查找原因。
测试:
因为canal是Java开发的,所以测试也采用Java作为客户端打印一下实时解析binlog的结果。不过在跑Java程序前,windows可以通过以下两个命令查看canal启动情况。
telnet 127.0.0.1 11111
netstat -ano | findstr "11111"
接下来也依然以新手的视角(因为以前都是写PHP,所以看我文件的应该也都是PHPer,所以熟悉的同学们可以复制下面代码测试)创建Java项目,构建Jar包,编写canal客户端,编译运行等等。
1. 打开IntekkiJ IDEA, 创建一个Maven项目。
2. 打开pom.xml添加以下依赖。
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
</dependencies>
3. 打开编辑器右上角的Maven按钮,并按刷新,等待下载依赖。
4. src/main/java 新建一个 Java类文件,粘贴以下代码。
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
public class CanalClientTest {
public static void main(String args[]) {
// 创建链接,这里就需要canal里配置的端口,账号密码,destination默认先example就行
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1",11111),
"example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
// 防止 deserializer failed报错
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
connector.subscribe();
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
5. 右击该文件Run, 开始编译运行,并打开运行台,然后再在数据库里随便找一张表,随便修改以下数据,就可以实时查看变动的信息了。
相关推荐
- 使用nginx配置域名及禁止直接通过IP访问网站
-
前段时间刚搭建好这个网站,一直没有关注一个问题,那就是IP地址也可以访问我的网站,今天就专门研究了一下nginx配置问题,争取把这个问题研究透彻。1.nginx配置域名及禁止直接通过IP访问先来看n...
- 如何在 Linux 中使用 PID 号查找进程名称?
-
在Linux的复杂世界中,进程是系统运行的核心,每个进程都由一个唯一的「进程ID」(PID)标识。无论是系统管理员在排查失控进程,还是开发者在调试应用程序,知道如何将PID映射到对应的进程名称都是一项...
- Linux服务器硬件信息查询与日常运维命令总结
-
1.服务器硬件信息查询1.1CPU信息查询命令功能描述示例lscpu显示CPU架构、核心数、线程数等lscpucat/proc/cpuinfo详细CPU信息(型号、缓存、频率)cat/proc/c...
- Ubuntu 操作系统常用命令详解(ubuntu常用的50个命令)
-
UbuntuLinux是一款流行的开源操作系统,广泛应用于服务器、开发、学习等场景。命令行是Ubuntu的灵魂,也是高效、稳定管理系统的利器。本文按照各大常用领域,详细总结Ubuntu必学...
- 从 0 到 1:打造基于 Linux 的私有 API 网关平台
-
在当今微服务架构盛行的时代,API网关作为服务入口和安全屏障,其重要性日益凸显。你是否想过,不依赖商业方案,完全基于开源组件,在Linux上构建一个属于自己的私有API网关平台?今天就带你...
- Nginx搭建简单直播服务器(nginx 直播服务器搭建)
-
前言使用Nginx+Nginx-rtmp-module在Ubuntu中搭建简单的rtmp推流直播服务器。服务器环境Ubuntu16.04相关概念RTMP:RTMP协议是RealTi...
- Linux连不上网?远程卡?这篇网络管理指南你不能错过!
-
大家好!今天咱们聊个所有Linux用户都躲不开的“老大难”——网络管理。我猜你肯定遇到过这些崩溃时刻:新装的Linux系统连不上Wi-Fi,急得直拍桌子;远程服务器SSH连不上,提示“Connecti...
- 7天从0到上线!手把手教你用Python Flask打造爆款Web服务
-
一、为什么全网开发者都在疯学Flask?在当今Web开发的战场,Flask就像一把“瑞士军刀”——轻量级架构让新手3天速成,灵活扩展能力又能支撑百万级用户项目!对比Django的“重型装甲”,Flas...
- nginx配置文件详解(nginx反向代理配置详解)
-
Nginx是一个强大的免费开源的HTTP服务器和反向代理服务器。在Web开发项目中,nginx常用作为静态文件服务器处理静态文件,并负责将动态请求转发至应用服务器(如Django,Flask,et...
- 30 分钟搞定 Docker 安装与 Nginx 部署,轻松搭建高效 Web 服务
-
在云计算时代,利用容器技术快速部署应用已成为开发者必备技能。本文将手把手教你在阿里云轻量应用服务器上,通过Docker高效部署Nginx并发布静态网站,全程可视化操作,新手也能轻松上手!一、准...
- Nginx 配置实战:从摸鱼到部署,手把手教你搞定生产级配置
-
各位摸鱼搭子们!今天咱不聊代码里的NullPointerException,改聊点「摸鱼必备生存技能」——Nginx配置!先灵魂拷问一下:写了一堆接口却不会部署?服务器被恶意请求打崩过?静态资源加载...
- 如何使用 Daphne + Nginx + supervisor部署 Django
-
前言:从Django3.0开始支持ASGI应用程序运行,使Django完全具有异步功能。Django目前已经更新到5.0,对异步支持也越来越好。但是,异步功能将仅对在ASGI下运行的应用程序可用...
- Docker命令最全详解(39个最常用命令)
-
Docker是云原生的核心,也是大厂的必备技能,下面我就全面来详解Docker核心命令@mikechen本文作者:陈睿|mikechen文章来源:mikechen.cc一、Docker基本命令doc...
- ubuntu中如何查看是否已经安装了nginx
-
在Ubuntu系统中,可以通过以下几种方法检查是否已安装Nginx:方法1:使用dpkg命令(适用于Debian/Ubuntu)bashdpkg-l|grepnginx输出...
- OVN 概念与实践(德育概念的泛化在理论和实践中有什么弊端?)
-
今天我们来讲解OVN的概念和基础实践,要理解本篇博客的内容,需要前置学习:Linux网络设备-Bridge&VethPairLinux网络设备-Bridge详解OVS+Fa...
你 发表评论:
欢迎- 一周热门
-
-
UOS服务器操作系统防火墙设置(uos20关闭防火墙)
-
极空间如何无损移机,新Z4 Pro又有哪些升级?极空间Z4 Pro深度体验
-
手机如何设置与显示准确时间的详细指南
-
NAS:DS video/DS file/DS photo等群晖移动端APP远程访问的教程
-
如何在安装前及安装后修改黑群晖的Mac地址和Sn系列号
-
如何修复用户配置文件服务在 WINDOWS 上登录失败的问题
-
一加手机与电脑互传文件的便捷方法FileDash
-
日本海上自卫队的军衔制度(日本海上自卫队的军衔制度是什么)
-
10个免费文件中转服务站,分享文件简单方便,你知道几个?
-
爱折腾的特斯拉车主必看!手把手教你TESLAMATE的备份和恢复
-
- 最近发表
-
- 使用nginx配置域名及禁止直接通过IP访问网站
- 如何在 Linux 中使用 PID 号查找进程名称?
- Linux服务器硬件信息查询与日常运维命令总结
- Ubuntu 操作系统常用命令详解(ubuntu常用的50个命令)
- 从 0 到 1:打造基于 Linux 的私有 API 网关平台
- Nginx搭建简单直播服务器(nginx 直播服务器搭建)
- Linux连不上网?远程卡?这篇网络管理指南你不能错过!
- 7天从0到上线!手把手教你用Python Flask打造爆款Web服务
- nginx配置文件详解(nginx反向代理配置详解)
- 30 分钟搞定 Docker 安装与 Nginx 部署,轻松搭建高效 Web 服务
- 标签列表
-
- linux 查询端口号 (58)
- docker映射容器目录到宿主机 (66)
- 杀端口 (60)
- yum更换阿里源 (62)
- internet explorer 增强的安全配置已启用 (65)
- linux自动挂载 (56)
- 禁用selinux (55)
- sysv-rc-conf (69)
- ubuntu防火墙状态查看 (64)
- windows server 2022激活密钥 (56)
- 无法与服务器建立安全连接是什么意思 (74)
- 443/80端口被占用怎么解决 (56)
- ping无法访问目标主机怎么解决 (58)
- fdatasync (59)
- 405 not allowed (56)
- 免备案虚拟主机zxhost (55)
- linux根据pid查看进程 (60)
- dhcp工具 (62)
- mysql 1045 (57)
- 宝塔远程工具 (56)
- ssh服务器拒绝了密码 请再试一次 (56)
- ubuntu卸载docker (56)
- linux查看nginx状态 (63)
- tomcat 乱码 (76)
- 2008r2激活序列号 (65)