Giter Site home page Giter Site logo

bigdata-notes's Introduction

bigdata-notes's People

Contributors

heibaiying avatar lsloveyj avatar pengchen211 avatar weijiew avatar yifanzheng avatar

Stargazers

 avatar  avatar

Watchers

 avatar

bigdata-notes's Issues

Flink:如何保证 Exactly-Once(精确一次)语义(容错机制和故障恢复)

Flink 应用程序内部的 Exactly-Once 语义

Flink 的 checkpoint(检查点) 和故障恢复算法保证了故障发生后应用状态的一致性。Flink 以固定的时间间隔生成 checkpoint,然后将 checkpoint 存储(异步发生的)在一个分布式的持久化系统中。如果发生故障,Flink 就会从最近一次成功的 checkpoint 中将应用程序的状态进行恢复,并从恢复的读取点继续处理,就好像什么事情没有发生一样。Flink 的状态存储在 Flink 的内部,这样做的好处就是不再依赖外部系统,降低了对外部系统的依赖性。

Flink 的端到端 Exactly-Once 语义

Flink 利用两阶段提交协议提供端到端的 Exactly-Once 语义。这意味着,外部系统必须提供提交或回滚的方法,然后通过 Flink 的检查点机制来协调。

Flink checkpoint 的启动表示两阶段提交协议的预提交阶段。当 checkpoint 启动时,Flink JobManager 会从数据源产生 checkpoint barrier。checkpoint barrier 在算子之间传递,对于每个算子,它会触发算子状态后端生成状态的快照。当一个进程具有外部状态(External state)时,状态处理会有所不同。外部状态通常以写入外部系统(如 Kafka)的形式出现。在这种情况下,在预提交阶段,除了将其状态写入状态后端之外,sink 还必须预先提交其外部事务。

当 checkpoint barrier 通过所有算子并且触发的快照回调成功完成时,预提交阶段结束。所有触发的状态快照都被视为该检查点的一部分。检查点是整个应用程序状态的快照,包括预先提交的外部状态。如果发生故障,我们可以回滚到上次成功完成快照的时间点。

下一步是通知所有算子检查点已成功完成。这是两阶段提交协议的提交阶段,JobManager 为应用程序中的每个算子发出检查点完成的回调。

Flink:Watermark

在真实的生产环境中,数据的传输会经过很多流程、在这个过程中,免不了由于网络抖动等等各种原因造成数据的延迟到达、本来应该先来的数据迟到了,这种情况怎么处理呢,Flink 的 watermark 机制来帮你处理。

Watermark 可以简单的理解为,通过设置一个可以接受的延迟时间,如果数据在规定时间还没到达, Flink 会等待一段时间,然后等数据到达了再触发计算,但由于是流处理,肯定不能无限制的等下去,对于超过了设置的等待时间还没到达的数据,那么只能抛弃或者存到另一个流里面用别的逻辑来处理了。

MongoDB:使用 Docker 搭建 MongoDB (副本集+分片集)集群

MongoDB 集群搭建

在三台 Linux 服务器上搭建 MongoDB 分布式集群,采用副本集+分片集群方式进行部署,并设置密码认证,以实现生产环境上的高可用。

集群规划

node1 实例 node2 实例 node3 实例 端口 副本集
mongos mongos mongos 27014  
config config config 27015 config
shard1 主 shard1 副 shard1 仲裁 27016 shard1
shard2 仲裁 shard2 主 shard2 副 27002 shard2
shard3 副 shard3仲 裁 shard3 主 27003 shard3

安装部署

注意:以下操作在三台服务器上都要执行一次。

准备镜像

镜像地址:https://hub.docker.com/_/mongo?tab=tags&page=1&ordering=last_updated

docker pull mongo

建立数据目录

sudo mkdir -p /tmp/mongodb_config
sudo mkdir -p /tmp/mongodb_shard1
sudo mkdir -p /tmp/mongodb_shard2
sudo mkdir -p /tmp/mongodb_shard3
sudo mkdir -p /tmp/mongodb_mongos
sudo mkdir -p /mongodb/keyfile
sudo mkdir -p /mongodb/mongos/log
sudo mkdir -p /mongodb/config/log
sudo mkdir -p /mongodb/config/data
sudo mkdir -p /mongodb/shard1/log
sudo mkdir -p /mongodb/shard1/data
sudo mkdir -p /mongodb/shard2/log
sudo mkdir -p /mongodb/shard2/data
sudo mkdir -p /mongodb/shard3/log
sudo mkdir -p /mongodb/shard3/data

目录说明:

  • /tmp/mongodb_xxx:用于存放 unix socket
  • /mongodb/keyfile:用于存放认证文件
  • /mongdb/xxx/log:用于存放日志文件
  • /mongodb/xxx/data:用于存放数据文件

生成认证文件

sudo openssl rand -base64 756 -out /mongodb/keyfile/mongodb.key
sudo chmod 400 /mongodb/keyfile/mongodb.key

由于认证文件在每台服务器上必须相同,所以可以使用以下命令将文件复制到其他服务器:

sudo scp /mongodb/keyfile/mongodb.key root@node2:/mongodb/keyfile

配置 Config Server

关于 MongoDB 各配置参数信息详见:Configuration File Options

编辑生成 Config Server 配置文件 config.conf 至 /opt/mongodb 目录

systemLog:
 destination: file
 logAppend: true
 path: "/mongodb/config/log/config.log"

storage:
 dbPath: /mongodb/config/data
 journal:
  enabled: true
 wiredTiger:
  engineConfig:
   directoryForIndexes: true

processManagement:
 timeZoneInfo: /usr/share/zoneinfo

net:
 port: 27015
 bindIpAll: true
 maxIncomingConnections: 65536
 unixDomainSocket:
  enabled: true
  pathPrefix: /tmp/mongodb_config
  filePermissions: 0700

security:
 keyFile: /mongodb/keyfile/mongodb.key
 clusterAuthMode: keyFile
 authorization: enabled
 javascriptEnabled: true

replication:
 replSetName: config
sharding:
 clusterRole: configsvr

配置 Shard1 Server

编辑生成 Shard1 Server 配置文件 shard1.conf 至 /opt/mongodb 目录

systemLog:
 destination: file
 logAppend: true
 path: "/mongodb/shard1/log/mongodb.log"

storage:
 dbPath: /mongodb/shard1/data
 journal:
  enabled: true
 wiredTiger:
  engineConfig:
   directoryForIndexes: true     

processManagement:
 timeZoneInfo: /usr/share/zoneinfo

net:
 port: 27016
 bindIpAll: true
 maxIncomingConnections: 65536
 unixDomainSocket:
  enabled: true
  pathPrefix: /tmp/mongodb_shard1
  filePermissions: 0700

security:
 keyFile: /mongodb/keyfile/mongodb.key
 clusterAuthMode: keyFile
 authorization: enabled
 javascriptEnabled: true

replication:
 replSetName: shard1
sharding:
 clusterRole: shardsvr

配置 Shard2 Server

编辑生成 Shard2 Server 配置文件 shard2.conf 至 /opt/mongodb 目录

systemLog:
 destination: file
 logAppend: true
 path: "/mongodb/shard2/log/mongodb.log"

storage:
 dbPath: /mongodb/shard2/data
 journal:
  enabled: true
 wiredTiger:
  engineConfig:
   directoryForIndexes: true     
processManagement:
 timeZoneInfo: /usr/share/zoneinfo

net:
 port: 27017
 bindIpAll: true
 maxIncomingConnections: 65536
 unixDomainSocket:
  enabled: true
  pathPrefix: /tmp/mongodb_shard2
  filePermissions: 0700

security:
 keyFile: /mongodb/keyfile/mongodb.key
 clusterAuthMode: keyFile
 authorization: enabled
 javascriptEnabled: true

replication:
 replSetName: shard2
sharding:
 clusterRole: shardsvr

配置 Shard3 Server

编辑生成 Shard3 Server 配置文件 shard3.conf 至 /opt/mongodb 目录

systemLog:
 destination: file
 logAppend: true
 path: "/mongodb/shard3/log/mongodb.log"

storage:
 dbPath: /mongodb/shard3/data
 journal:
  enabled: true
 wiredTiger:
  engineConfig:
   directoryForIndexes: true     

processManagement:
 timeZoneInfo: /usr/share/zoneinfo

net:
 port: 27018
 bindIpAll: true
 maxIncomingConnections: 65536
 unixDomainSocket:
  enabled: true
  pathPrefix: /tmp/mongodb_shard3
  filePermissions: 0700

security:
 keyFile: /mongodb/keyfile/mongodb.key
 clusterAuthMode: keyFile
 authorization: enabled
 javascriptEnabled: true

replication:
 replSetName: shard3
sharding:
 clusterRole: shardsvr

配置 Mongos Server

编辑生成 Mongos Server 配置文件 mongos.conf 至 /opt/mongodb 目录

systemLog:
 destination: file
 logAppend: true
 path: /mongodb/mongos/log/mongodb.log

processManagement:
 timeZoneInfo: /usr/share/zoneinfo

net:
 port: 27014
 bindIpAll: true
 maxIncomingConnections: 65536
 unixDomainSocket:
  enabled: true
  pathPrefix: /tmp/mongodb_mongos
  filePermissions: 0700

security:
 keyFile: /mongodb/keyfile/mongodb.key
 clusterAuthMode: keyFile

replication:
 localPingThresholdMs: 15

sharding:
 configDB: config/node1:27015,node2:27015,node3:27015

启动各服务

  • 启动 Config Server
sudo docker run -d -p 27015:27015 --name mongo-configsvr --entrypoint "mongod" -v /opt/mongodb/config.conf:/opt/mongodb/config.conf -v /mongodb/keyfile/mongodb.key:/mongodb/keyfile/mongodb.key -v /mongodb/config/log/config.log:/mongodb/config/log/config.log -v /mongodb/config/data:/mongodb/config/data -v /tmp/mongodb_config:/tmp/mongodb_config a.newegg.org/newegg-docker/mongo -f /opt/mongodb/config.conf
  • 启动 Shard1 Server
sudo docker run -d -p 27016:27016 --name mongo-shard1 --entrypoint "mongod" -v /opt/mongodb/shard1.conf:/opt/mongodb/shard1.conf -v /mongodb/keyfile/mongodb.key:/mongodb/keyfile/mongodb.key -v /mongodb/shard1/log/mongodb.log:/mongodb/shard1/log/mongodb.log -v /mongodb/shard1/data:/mongodb/shard1/data -v /tmp/mongodb_shard1:/tmp/mongodb_shard1 a.newegg.org/newegg-docker/mongo -f /opt/mongodb/shard1.conf
  • 启动 Shard2 Server
sudo docker run -d -p 27017:27017 --name mongo-shard2 --entrypoint "mongod" -v /opt/mongodb/shard2.conf:/opt/mongodb/shard2.conf -v /mongodb/keyfile/mongodb.key:/mongodb/keyfile/mongodb.key -v /mongodb/shard2/log/mongodb.log:/mongodb/shard2/log/mongodb.log -v /mongodb/shard2/data:/mongodb/shard2/data -v /tmp/mongodb_shard2:/tmp/mongodb_shard2 a.newegg.org/newegg-docker/mongo -f /opt/mongodb/shard2.conf
  • 启动 Shard3 Server
sudo docker run -d -p 27018:27018 --name mongo-shard3 --entrypoint "mongod" -v /opt/mongodb/shard3.conf:/opt/mongodb/shard3.conf -v /mongodb/keyfile/mongodb.key:/mongodb/keyfile/mongodb.key -v /mongodb/shard3/log/mongodb.log:/mongodb/shard3/log/mongodb.log -v /mongodb/shard3/data:/mongodb/shard3/data -v /tmp/mongodb_shard3:/tmp/mongodb_shard3 a.newegg.org/newegg-docker/mongo --config /opt/mongodb/shard3.conf
  • 启动 Mongos Server
sudo docker run --name mongos -d -p 27014:27014 --entrypoint "mongos" -v /opt/mongodb/mongos.conf:/opt/mongodb/mongos.conf -v /mongodb/keyfile/mongodb.key:/mongodb/keyfile/mongodb.key -v /mongodb/mongos/log/mongodb.log:/mongodb/mongos/log/mongodb.log  -v /tmp/mongodb_mongos:/tmp/mongodb_mongos a.newegg.org/newegg-docker/mongo -f /opt/mongodb/mongos.conf

配置分片集

注意:以下操作在任一台服务器上执行一次即可。

配置 Config 分片

sudo docker exec -it mongo-configsvr bash

mongo --port 27015

rs.initiate({
"_id":"config",
"members":[{
	"_id":0,
       "host":"node1:27015"
},{
	"_id":1,
	"host":"node2:27015"
},{
	"_id":2,
	"host":"node3:27015"
}]})

配置 Shard1 分片

sudo docker exec -it mongo-shard1 bash

mongo --port 27016

rs.initiate({
_id:"shard1",
members:[{
	_id:0,
       host:"node1:27016"
},{
	_id:1,
	host:"node2:27016"
},{
	_id:2,
	host:"node3:27016",
	arbiterOnly:true
}]})

配置 Shard2 分片

sudo docker exec -it mongo-shard2 bash

mongo --port 27017

rs.initiate({
_id:"shard2",
members:[{
	_id:0,
       host:"node1:27017",
       arbiterOnly:true
},{
	_id:1,
	host:"node2:27017"
},{
	_id:2,
	host:"node3:27017"
}]})

配置 Shard3 分片

sudo docker exec -it mongo-shard3 bash

mongo --port 27018

rs.initiate({
_id:"shard1",
members:[{
	_id:0,
       host:"node1:27018"
},{
	_id:1,
	host:"node2:27018",
        arbiterOnly:true
},{
	_id:2,
	host:"node3:27018"
}]})

执行 rs.status() 可以查看状态。

注意: 对于 Shard1,Shard2,Shard3 分片的配置,如果在同一台服务器上不能一起配置,可以换另一台服务器配置。

配置 Mongos 分片

sudo docker exec -it mongos bash

mongo --port 27014

use admin

sh.addShard("shard1/node1:27016,node2:27016,node3:27016")
sh.addShard("shard2/node1:27017,node2:27017,node3:27017")
sh.addShard("shard3/node1:27018,node2:27018,node3:27018")

sh.status()

测试

// 创建管理员账号
db.createUser({user:"admin",pwd:"admin",roles:[{role:"root",db:"admin"}]})

// 登录
mongo node1:27014 -u admin -p admin  --authenticationDatabase admin

// 创建测试数据库
sh.enableSharding("mytest")
sh.shardCollection("mytest.coll1",{"age":1})

//  插入数据
for(var i=1;i<=1000;i++){
db.coll1.insert({
name:i,
age:Math.round(Math.random() * 100),
score1:Math.round(Math.random() * 100),
score2:Math.round(Math.random() * 100),
score3:Math.round(Math.random() * 100),
score4:Math.round(Math.random() * 100),
score5:Math.round(Math.random() * 100)
});
}

db.coll1.find().count()
db.coll1.find().limit(1)

常见异常

启动时存在权限异常

使用 Docker 启动 MongoDB 时,如果出现类似 Permisson not support 异常,执行以下命令,提升文件或目录权限:

chown 999:999 <文件 或 目录>

参考

  1. http://ifso.cn/post/mongodb-cluster/
  2. https://github.com/yekoufeng/docker_mongodb_rs

Flink:确定 Flink 作业所需资源大小时需考虑的 6 件事

确定集群的大小很显然是决定于多种因素的,例如应用场景,应用的规模,以及特定的服务等级协议(SLA,如宕机时间、最大吞吐、能接受的延迟)。

确定 Flink 集群大小需要考虑的几个因素:

  • 记录数和每条记录的大小

确定集群大小的首要事情就是估算预期进入流计算系统的每秒记录数(也就是我们常说的吞吐量),以及每条记录的大小。不同的记录类型会有不同的大小,这将最终影响 Flink 应用程序平稳运行所需的资源。

  • 不同 key 的数量和每个 key 存储的 state 大小

应用程序中不同 key 的数量和每个 key 所需要存储的 state 大小,都将影响到 Flink 应用程序所需的资源,从而能高效地运行,避免任何反压。

  • 状态的更新频率和状态后端的访问模式

第三个考虑因素是状态的更新频率,因为状态的更新通常是一个高消耗的动作。而不同的状态后端(如 RocksDB,Java Heap)的访问模式差异很大,RocksDB 的每次读取和更新都会涉及序列化和反序列化以及 JNI 操作,而 Java Heap 的状态后端不支持增量 checkpoint,导致大状态场景需要每次持久化的数据量较大。这些因素都会显著地影响集群的大小和 Flink 作业所需的资源。

  • 网络容量

网络容量不仅仅会收到 Flink 应用程序本身的影响,也会受到可能正在交互的 Kafka、HDFS 等外部服务的影响。这些外部服务可能会导致额外的网络流量。例如,启用 replication 可能会在网络的消息 broker 之间产生额外的流量。

  • 磁盘带宽

如果你的应用程序依赖了基于磁盘的状态后端,如 RocksDB,或者考虑使用 Kafka 或 HDFS,那么磁盘的带宽也需要纳入考虑。

  • 机器数量及其可用 CPU 和内存

最后但并非最不重要的,在开始应用部署前,你需要考虑集群中可用机器的数量及其可用的 CPU 和内存。这最终确保了在将应用程序投入生产之后,集群有充足的处理能力。

Flink:无状态处理与有状态处理

如果处理一个事件(或一条数据)的结果只跟事件本身的内容有关,称为无状态处理;

如果处理一个事件(或一条数据)的结果和之前处理过的事件有关,称为有状态处理。比如,基本的聚合,数据流之间的关联都是有状态处理。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.