- 😄 My blog: http://yifanstar.top/
- 📫 How to reach me: [email protected]
yifanzheng / bigdata-notes Goto Github PK
View Code? Open in Web Editor NEWThis project forked from heibaiying/bigdata-notes
大数据入门指南 :star:
This project forked from heibaiying/bigdata-notes
大数据入门指南 :star:
Flink 的 checkpoint(检查点) 和故障恢复算法保证了故障发生后应用状态的一致性。Flink 以固定的时间间隔生成 checkpoint,然后将 checkpoint 存储(异步发生的)在一个分布式的持久化系统中。如果发生故障,Flink 就会从最近一次成功的 checkpoint 中将应用程序的状态进行恢复,并从恢复的读取点继续处理,就好像什么事情没有发生一样。Flink 的状态存储在 Flink 的内部,这样做的好处就是不再依赖外部系统,降低了对外部系统的依赖性。
Flink 利用两阶段提交协议提供端到端的 Exactly-Once 语义。这意味着,外部系统必须提供提交或回滚的方法,然后通过 Flink 的检查点机制来协调。
Flink checkpoint 的启动表示两阶段提交协议的预提交阶段。当 checkpoint 启动时,Flink JobManager 会从数据源产生 checkpoint barrier。checkpoint barrier 在算子之间传递,对于每个算子,它会触发算子状态后端生成状态的快照。当一个进程具有外部状态(External state)时,状态处理会有所不同。外部状态通常以写入外部系统(如 Kafka)的形式出现。在这种情况下,在预提交阶段,除了将其状态写入状态后端之外,sink 还必须预先提交其外部事务。
当 checkpoint barrier 通过所有算子并且触发的快照回调成功完成时,预提交阶段结束。所有触发的状态快照都被视为该检查点的一部分。检查点是整个应用程序状态的快照,包括预先提交的外部状态。如果发生故障,我们可以回滚到上次成功完成快照的时间点。
下一步是通知所有算子检查点已成功完成。这是两阶段提交协议的提交阶段,JobManager 为应用程序中的每个算子发出检查点完成的回调。
在真实的生产环境中,数据的传输会经过很多流程、在这个过程中,免不了由于网络抖动等等各种原因造成数据的延迟到达、本来应该先来的数据迟到了,这种情况怎么处理呢,Flink 的 watermark 机制来帮你处理。
Watermark 可以简单的理解为,通过设置一个可以接受的延迟时间,如果数据在规定时间还没到达, Flink 会等待一段时间,然后等数据到达了再触发计算,但由于是流处理,肯定不能无限制的等下去,对于超过了设置的等待时间还没到达的数据,那么只能抛弃或者存到另一个流里面用别的逻辑来处理了。
在三台 Linux 服务器上搭建 MongoDB 分布式集群,采用副本集+分片集群方式进行部署,并设置密码认证,以实现生产环境上的高可用。
node1 实例 | node2 实例 | node3 实例 | 端口 | 副本集 |
---|---|---|---|---|
mongos | mongos | mongos | 27014 | |
config | config | config | 27015 | config |
shard1 主 | shard1 副 | shard1 仲裁 | 27016 | shard1 |
shard2 仲裁 | shard2 主 | shard2 副 | 27002 | shard2 |
shard3 副 | shard3仲 裁 | shard3 主 | 27003 | shard3 |
注意:以下操作在三台服务器上都要执行一次。
准备镜像
镜像地址:https://hub.docker.com/_/mongo?tab=tags&page=1&ordering=last_updated
docker pull mongo
建立数据目录
sudo mkdir -p /tmp/mongodb_config
sudo mkdir -p /tmp/mongodb_shard1
sudo mkdir -p /tmp/mongodb_shard2
sudo mkdir -p /tmp/mongodb_shard3
sudo mkdir -p /tmp/mongodb_mongos
sudo mkdir -p /mongodb/keyfile
sudo mkdir -p /mongodb/mongos/log
sudo mkdir -p /mongodb/config/log
sudo mkdir -p /mongodb/config/data
sudo mkdir -p /mongodb/shard1/log
sudo mkdir -p /mongodb/shard1/data
sudo mkdir -p /mongodb/shard2/log
sudo mkdir -p /mongodb/shard2/data
sudo mkdir -p /mongodb/shard3/log
sudo mkdir -p /mongodb/shard3/data
目录说明:
生成认证文件
sudo openssl rand -base64 756 -out /mongodb/keyfile/mongodb.key
sudo chmod 400 /mongodb/keyfile/mongodb.key
由于认证文件在每台服务器上必须相同,所以可以使用以下命令将文件复制到其他服务器:
sudo scp /mongodb/keyfile/mongodb.key root@node2:/mongodb/keyfile
配置 Config Server
关于 MongoDB 各配置参数信息详见:Configuration File Options
编辑生成 Config Server 配置文件 config.conf 至 /opt/mongodb
目录
systemLog:
destination: file
logAppend: true
path: "/mongodb/config/log/config.log"
storage:
dbPath: /mongodb/config/data
journal:
enabled: true
wiredTiger:
engineConfig:
directoryForIndexes: true
processManagement:
timeZoneInfo: /usr/share/zoneinfo
net:
port: 27015
bindIpAll: true
maxIncomingConnections: 65536
unixDomainSocket:
enabled: true
pathPrefix: /tmp/mongodb_config
filePermissions: 0700
security:
keyFile: /mongodb/keyfile/mongodb.key
clusterAuthMode: keyFile
authorization: enabled
javascriptEnabled: true
replication:
replSetName: config
sharding:
clusterRole: configsvr
配置 Shard1 Server
编辑生成 Shard1 Server 配置文件 shard1.conf 至 /opt/mongodb
目录
systemLog:
destination: file
logAppend: true
path: "/mongodb/shard1/log/mongodb.log"
storage:
dbPath: /mongodb/shard1/data
journal:
enabled: true
wiredTiger:
engineConfig:
directoryForIndexes: true
processManagement:
timeZoneInfo: /usr/share/zoneinfo
net:
port: 27016
bindIpAll: true
maxIncomingConnections: 65536
unixDomainSocket:
enabled: true
pathPrefix: /tmp/mongodb_shard1
filePermissions: 0700
security:
keyFile: /mongodb/keyfile/mongodb.key
clusterAuthMode: keyFile
authorization: enabled
javascriptEnabled: true
replication:
replSetName: shard1
sharding:
clusterRole: shardsvr
配置 Shard2 Server
编辑生成 Shard2 Server 配置文件 shard2.conf 至 /opt/mongodb
目录
systemLog:
destination: file
logAppend: true
path: "/mongodb/shard2/log/mongodb.log"
storage:
dbPath: /mongodb/shard2/data
journal:
enabled: true
wiredTiger:
engineConfig:
directoryForIndexes: true
processManagement:
timeZoneInfo: /usr/share/zoneinfo
net:
port: 27017
bindIpAll: true
maxIncomingConnections: 65536
unixDomainSocket:
enabled: true
pathPrefix: /tmp/mongodb_shard2
filePermissions: 0700
security:
keyFile: /mongodb/keyfile/mongodb.key
clusterAuthMode: keyFile
authorization: enabled
javascriptEnabled: true
replication:
replSetName: shard2
sharding:
clusterRole: shardsvr
配置 Shard3 Server
编辑生成 Shard3 Server 配置文件 shard3.conf 至 /opt/mongodb
目录
systemLog:
destination: file
logAppend: true
path: "/mongodb/shard3/log/mongodb.log"
storage:
dbPath: /mongodb/shard3/data
journal:
enabled: true
wiredTiger:
engineConfig:
directoryForIndexes: true
processManagement:
timeZoneInfo: /usr/share/zoneinfo
net:
port: 27018
bindIpAll: true
maxIncomingConnections: 65536
unixDomainSocket:
enabled: true
pathPrefix: /tmp/mongodb_shard3
filePermissions: 0700
security:
keyFile: /mongodb/keyfile/mongodb.key
clusterAuthMode: keyFile
authorization: enabled
javascriptEnabled: true
replication:
replSetName: shard3
sharding:
clusterRole: shardsvr
配置 Mongos Server
编辑生成 Mongos Server 配置文件 mongos.conf 至 /opt/mongodb
目录
systemLog:
destination: file
logAppend: true
path: /mongodb/mongos/log/mongodb.log
processManagement:
timeZoneInfo: /usr/share/zoneinfo
net:
port: 27014
bindIpAll: true
maxIncomingConnections: 65536
unixDomainSocket:
enabled: true
pathPrefix: /tmp/mongodb_mongos
filePermissions: 0700
security:
keyFile: /mongodb/keyfile/mongodb.key
clusterAuthMode: keyFile
replication:
localPingThresholdMs: 15
sharding:
configDB: config/node1:27015,node2:27015,node3:27015
启动各服务
sudo docker run -d -p 27015:27015 --name mongo-configsvr --entrypoint "mongod" -v /opt/mongodb/config.conf:/opt/mongodb/config.conf -v /mongodb/keyfile/mongodb.key:/mongodb/keyfile/mongodb.key -v /mongodb/config/log/config.log:/mongodb/config/log/config.log -v /mongodb/config/data:/mongodb/config/data -v /tmp/mongodb_config:/tmp/mongodb_config a.newegg.org/newegg-docker/mongo -f /opt/mongodb/config.conf
sudo docker run -d -p 27016:27016 --name mongo-shard1 --entrypoint "mongod" -v /opt/mongodb/shard1.conf:/opt/mongodb/shard1.conf -v /mongodb/keyfile/mongodb.key:/mongodb/keyfile/mongodb.key -v /mongodb/shard1/log/mongodb.log:/mongodb/shard1/log/mongodb.log -v /mongodb/shard1/data:/mongodb/shard1/data -v /tmp/mongodb_shard1:/tmp/mongodb_shard1 a.newegg.org/newegg-docker/mongo -f /opt/mongodb/shard1.conf
sudo docker run -d -p 27017:27017 --name mongo-shard2 --entrypoint "mongod" -v /opt/mongodb/shard2.conf:/opt/mongodb/shard2.conf -v /mongodb/keyfile/mongodb.key:/mongodb/keyfile/mongodb.key -v /mongodb/shard2/log/mongodb.log:/mongodb/shard2/log/mongodb.log -v /mongodb/shard2/data:/mongodb/shard2/data -v /tmp/mongodb_shard2:/tmp/mongodb_shard2 a.newegg.org/newegg-docker/mongo -f /opt/mongodb/shard2.conf
sudo docker run -d -p 27018:27018 --name mongo-shard3 --entrypoint "mongod" -v /opt/mongodb/shard3.conf:/opt/mongodb/shard3.conf -v /mongodb/keyfile/mongodb.key:/mongodb/keyfile/mongodb.key -v /mongodb/shard3/log/mongodb.log:/mongodb/shard3/log/mongodb.log -v /mongodb/shard3/data:/mongodb/shard3/data -v /tmp/mongodb_shard3:/tmp/mongodb_shard3 a.newegg.org/newegg-docker/mongo --config /opt/mongodb/shard3.conf
sudo docker run --name mongos -d -p 27014:27014 --entrypoint "mongos" -v /opt/mongodb/mongos.conf:/opt/mongodb/mongos.conf -v /mongodb/keyfile/mongodb.key:/mongodb/keyfile/mongodb.key -v /mongodb/mongos/log/mongodb.log:/mongodb/mongos/log/mongodb.log -v /tmp/mongodb_mongos:/tmp/mongodb_mongos a.newegg.org/newegg-docker/mongo -f /opt/mongodb/mongos.conf
注意:以下操作在任一台服务器上执行一次即可。
配置 Config 分片
sudo docker exec -it mongo-configsvr bash
mongo --port 27015
rs.initiate({
"_id":"config",
"members":[{
"_id":0,
"host":"node1:27015"
},{
"_id":1,
"host":"node2:27015"
},{
"_id":2,
"host":"node3:27015"
}]})
配置 Shard1 分片
sudo docker exec -it mongo-shard1 bash
mongo --port 27016
rs.initiate({
_id:"shard1",
members:[{
_id:0,
host:"node1:27016"
},{
_id:1,
host:"node2:27016"
},{
_id:2,
host:"node3:27016",
arbiterOnly:true
}]})
配置 Shard2 分片
sudo docker exec -it mongo-shard2 bash
mongo --port 27017
rs.initiate({
_id:"shard2",
members:[{
_id:0,
host:"node1:27017",
arbiterOnly:true
},{
_id:1,
host:"node2:27017"
},{
_id:2,
host:"node3:27017"
}]})
配置 Shard3 分片
sudo docker exec -it mongo-shard3 bash
mongo --port 27018
rs.initiate({
_id:"shard1",
members:[{
_id:0,
host:"node1:27018"
},{
_id:1,
host:"node2:27018",
arbiterOnly:true
},{
_id:2,
host:"node3:27018"
}]})
执行 rs.status() 可以查看状态。
注意: 对于 Shard1,Shard2,Shard3 分片的配置,如果在同一台服务器上不能一起配置,可以换另一台服务器配置。
配置 Mongos 分片
sudo docker exec -it mongos bash
mongo --port 27014
use admin
sh.addShard("shard1/node1:27016,node2:27016,node3:27016")
sh.addShard("shard2/node1:27017,node2:27017,node3:27017")
sh.addShard("shard3/node1:27018,node2:27018,node3:27018")
sh.status()
// 创建管理员账号
db.createUser({user:"admin",pwd:"admin",roles:[{role:"root",db:"admin"}]})
// 登录
mongo node1:27014 -u admin -p admin --authenticationDatabase admin
// 创建测试数据库
sh.enableSharding("mytest")
sh.shardCollection("mytest.coll1",{"age":1})
// 插入数据
for(var i=1;i<=1000;i++){
db.coll1.insert({
name:i,
age:Math.round(Math.random() * 100),
score1:Math.round(Math.random() * 100),
score2:Math.round(Math.random() * 100),
score3:Math.round(Math.random() * 100),
score4:Math.round(Math.random() * 100),
score5:Math.round(Math.random() * 100)
});
}
db.coll1.find().count()
db.coll1.find().limit(1)
启动时存在权限异常
使用 Docker 启动 MongoDB 时,如果出现类似 Permisson not support
异常,执行以下命令,提升文件或目录权限:
chown 999:999 <文件 或 目录>
确定集群的大小很显然是决定于多种因素的,例如应用场景,应用的规模,以及特定的服务等级协议(SLA,如宕机时间、最大吞吐、能接受的延迟)。
确定 Flink 集群大小需要考虑的几个因素:
确定集群大小的首要事情就是估算预期进入流计算系统的每秒记录数(也就是我们常说的吞吐量),以及每条记录的大小。不同的记录类型会有不同的大小,这将最终影响 Flink 应用程序平稳运行所需的资源。
应用程序中不同 key 的数量和每个 key 所需要存储的 state 大小,都将影响到 Flink 应用程序所需的资源,从而能高效地运行,避免任何反压。
第三个考虑因素是状态的更新频率,因为状态的更新通常是一个高消耗的动作。而不同的状态后端(如 RocksDB,Java Heap)的访问模式差异很大,RocksDB 的每次读取和更新都会涉及序列化和反序列化以及 JNI 操作,而 Java Heap 的状态后端不支持增量 checkpoint,导致大状态场景需要每次持久化的数据量较大。这些因素都会显著地影响集群的大小和 Flink 作业所需的资源。
网络容量不仅仅会收到 Flink 应用程序本身的影响,也会受到可能正在交互的 Kafka、HDFS 等外部服务的影响。这些外部服务可能会导致额外的网络流量。例如,启用 replication 可能会在网络的消息 broker 之间产生额外的流量。
如果你的应用程序依赖了基于磁盘的状态后端,如 RocksDB,或者考虑使用 Kafka 或 HDFS,那么磁盘的带宽也需要纳入考虑。
最后但并非最不重要的,在开始应用部署前,你需要考虑集群中可用机器的数量及其可用的 CPU 和内存。这最终确保了在将应用程序投入生产之后,集群有充足的处理能力。
如果处理一个事件(或一条数据)的结果只跟事件本身的内容有关,称为无状态处理;
如果处理一个事件(或一条数据)的结果和之前处理过的事件有关,称为有状态处理。比如,基本的聚合,数据流之间的关联都是有状态处理。
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.