基于 Kubernetes 的湖倉一體高可用架構部署指南
一、架構概述
本指南將詳細介紹如何在Kubernetes環境中部署一套完整的湖倉一體高可用架構,該架構整合了以下核心組件:
- CentOS 8.5: 作為基礎操作系統環境
- Kafka 3.0: 分布式消息隊列,實現數據接入和流轉
- Flink 1.18: 實時流處理引擎,負責數據轉換和計算
- Paimon 0.7: 開放數據湖存儲格式,提供統一的湖存儲層
- Doris 2.1.6: MPP分析型數據庫,作為高性能數據倉庫
架構設計原則:
- 高可用性: 所有組件均采用多副本部署,避免單點故障
- 彈性擴展: 基于Kubernetes的自動伸縮能力,根據負載動態調整資源
- 數據一致性: 通過事務機制和副本同步確保數據一致性
- 統一元數據: 使用Hive Metastore作為統一元數據管理中心
- 資源隔離: 通過Kubernetes命名空間實現多租戶資源隔離
二、環境準備
1. Kubernetes集群要求
(1) Kubernetes版本: 1.23+
(2) 節點數量: 至少5個節點(3個master,2個worker)
(3) 節點配置:
- Master節點: 4核CPU, 8GB內存, 50GB存儲
- Worker節點: 8核CPU, 32GB內存, 200GB存儲
(4) 網絡插件: Calico或Flannel
(5) 存儲插件: 支持動態卷供給(如NFS、CephFS或云存儲)
2. 基礎軟件安裝
在所有節點上安裝基礎軟件:
# 更新系統
sudo dnf update -y
# 安裝必要工具
sudo dnf install -y git wget curl vim net-tools
# 安裝Docker(容器運行時)
sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
sudo dnf install -y docker-ce docker-ce-cli containerd.io
sudo systemctl enable --now docker
# 配置Docker鏡像加速
sudomkdir -p /etc/docker
sudotee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["https://mirror.ccs.tencentyun.com"],
"exec-opts": ["native.cgroupdriver=systemd"],
"log-driver": "json-file",
"log-opts": {
"max-size": "100m"
},
"storage-driver": "overlay2"
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker
3. Kubernetes集群初始化
在Master節點執行:
# 初始化Kubernetes集群
sudo kubeadm init --pod-network-cidr=10.244.0.0/16 --control-plane-endpoint=k8s-master
# 配置kubectl
mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config
# 安裝網絡插件
kubectl apply -f https://raw.githubusercontent.com/flannel-io/flannel/master/Documentation/kube-flannel.yml
# 允許master節點調度pod(用于測試環境)
kubectl taint nodes --all node-role.kubernetes.io/control-plane-
4. Helm包管理器安裝
# 下載并安裝Helm
wget https://get.helm.sh/helm-v3.10.0-linux-amd64.tar.gz
tar -zxvf helm-v3.10.0-linux-amd64.tar.gz
sudo mv linux-amd64/helm /usr/local/bin/helm
5. 創建命名空間
kubectl create namespace lakehouse
kubectl create namespace monitoring
三、存儲系統部署
1. NFS存儲部署
# 在所有節點安裝NFS客戶端
sudo dnf install -y nfs-utils
# 創建NFS服務器(在獨立節點或使用云存儲)
sudomkdir -p /data/nfs
sudochmod -R 777 /data/nfs
echo"/data/nfs *(rw,sync,no_root_squash)" | sudotee -a /etc/exports
sudo exportfs -a
sudo systemctl enable --now nfs-server
# 創建StorageClass
cat <<EOF | kubectl apply -f -
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: nfs-storage
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: WaitForFirstConsumer
EOF
# 創建PV和PVC
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: PersistentVolume
metadata:
name: nfs-pv
spec:
capacity:
storage: 100Gi
accessModes:
- ReadWriteMany
nfs:
server: <nfs-server-ip>
path: "/data/nfs"
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: nfs-pvc
namespace: lakehouse
spec:
accessModes:
- ReadWriteMany
storageClassName: nfs-storage
resources:
requests:
storage: 100Gi
EOF
2. HDFS部署(可選)
如果使用HDFS作為Paimon的底層存儲:
# 使用Helm部署HDFS
helm repo add apache https://apache.github.io/hadoop-charts
helm install hdfs apache/hadoop --namespace lakehouse \
--set core.service.type=NodePort \
--set hdfs.nameNode.replicas=3 \
--set hdfs.dataNode.replicas=3 \
--set persistence.enabled=true \
--set persistence.storageClass=nfs-storage \
--set persistence.size=100Gi
四、消息隊列Kafka部署
1. ZooKeeper部署
# 創建ZooKeeper配置
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Service
metadata:
name: zookeeper-service
namespace: lakehouse
labels:
app: zookeeper
spec:
ports:
- name: client
port: 2181
protocol: TCP
- name: follower
port: 2888
protocol: TCP
- name: leader
port: 3888
protocol: TCP
clusterIP: None
selector:
app: zookeeper
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: zookeeper
namespace: lakehouse
spec:
serviceName: zookeeper-service
replicas: 3
selector:
matchLabels:
app: zookeeper
template:
metadata:
labels:
app: zookeeper
spec:
containers:
- name: zookeeper
image: confluentinc/cp-zookeeper:7.3.0
ports:
- containerPort: 2181
name: client
- containerPort: 2888
name: follower
- containerPort: 3888
name: leader
env:
- name: ZOOKEEPER_CLIENT_PORT
value: "2181"
- name: ZOOKEEPER_TICK_TIME
value: "2000"
- name: ZOOKEEPER_INIT_LIMIT
value: "5"
- name: ZOOKEEPER_SYNC_LIMIT
value: "2"
- name: ZOOKEEPER_SERVERS
value: "zookeeper-0.zookeeper-service:2888:3888;zookeeper-1.zookeeper-service:2888:3888;zookeeper-2.zookeeper-service:2888:3888"
volumeMounts:
- name: data
mountPath: /var/lib/zookeeper/data
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: nfs-storage
resources:
requests:
storage: 10Gi
EOF
2. Kafka集群部署
# 創建Kafka配置
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Service
metadata:
name: kafka-service
namespace: lakehouse
labels:
app: kafka
spec:
ports:
- name: kafka
port: 9092
protocol: TCP
clusterIP: None
selector:
app: kafka
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
namespace: lakehouse
spec:
serviceName: kafka-service
replicas: 3
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
containers:
- name: kafka
image: confluentinc/cp-kafka:7.3.0
ports:
- containerPort: 9092
name: kafka
env:
- name: KAFKA_BROKER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: KAFKA_ZOOKEEPER_CONNECT
value: "zookeeper-0.zookeeper-service:2181,zookeeper-1.zookeeper-service:2181,zookeeper-2.zookeeper-service:2181"
- name: KAFKA_ADVERTISED_LISTENERS
value: "PLAINTEXT://kafka-0.kafka-service:9092,kafka-1.kafka-service:9092,kafka-2.kafka-service:9092"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "3"
- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: "3"
- name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
value: "2"
- name: KAFKA_DEFAULT_REPLICATION_FACTOR
value: "3"
- name: KAFKA_MIN_IN_SYNC_REPLICAS
value: "2"
volumeMounts:
- name: data
mountPath: /var/lib/kafka/data
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: nfs-storage
resources:
requests:
storage: 100Gi
EOF
3. Kafka主題創建
# 創建Kafka客戶端Pod
kubectl run kafka-client --rm -ti --image confluentinc/cp-kafka:7.3.0 --namespace lakehouse -- bash
# 在Pod內執行創建主題命令
kafka-topics --create --topic orders --bootstrap-server kafka-service:9092 --partitions 3 --replication-factor 3
kafka-topics --create --topic payments --bootstrap-server kafka-service:9092 --partitions 3 --replication-factor 3
kafka-topics --create --topic inventory --bootstrap-server kafka-service:9092 --partitions 3 --replication-factor 3
五、實時計算Flink部署
1. Flink集群配置
# 創建Flink配置文件
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
namespace: lakehouse
data:
flink-conf.yaml: |
jobmanager.rpc.address: flink-jobmanager
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 3200m
taskmanager.memory.process.size: 4096m
taskmanager.numberOfTaskSlots: 4
parallelism.default: 3
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.cluster-id: flink-cluster
high-availability.storageDir: file:///flink/ha/
rest.port: 8081
kubernetes.rest-service.exposed.type: NodePort
kubernetes.jobmanager.cpu: 1.0
kubernetes.taskmanager.cpu: 2.0
kubernetes.container.image: flink:1.18.0-scala_2.12-java11
kubernetes.namespace: lakehouse
state.backend: rocksdb
state.checkpoints.dir: file:///flink/checkpoints
state.savepoints.dir: file:///flink/savepoints
execution.checkpointing.interval: 30s
execution.checkpointing.timeout: 10min
execution.checkpointing.min-pause: 5s
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
execution.checkpointing.unaligned: true
execution.checkpointing.alignment-timeout: 30s
execution.checkpointing.tolerable-failed-checkpoints: 3
execution.checkpointing.mode: EXACTLY_ONCE
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10s
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9999
metrics.reporter.prom.interval: 30 SECONDS
log4j-console.properties: |
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
EOF
2. Flink JobManager部署
# 創建Flink JobManager服務
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
namespace: lakehouse
spec:
type: NodePort
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: ui
port: 8081
targetPort: 8081
nodePort: 30081
- name: prom
port: 9999
selector:
app: flink
component: jobmanager
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
namespace: lakehouse
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: flink:1.18.0-scala_2.12-java11
args: ["jobmanager"]
ports:
- containerPort: 6123
- containerPort: 6124
- containerPort: 8081
- containerPort: 9999
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: flink-storage
mountPath: /flink
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: flink-storage
persistentVolumeClaim:
claimName: nfs-pvc
EOF
3. Flink TaskManager部署
# 創建Flink TaskManager
cat <<EOF | kubectl apply -f -
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: lakehouse
spec:
replicas: 3
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.18.0-scala_2.12-java11
args: ["taskmanager"]
ports:
- containerPort: 6122
- containerPort: 9999
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: flink-storage
mountPath: /flink
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: flink-storage
persistentVolumeClaim:
claimName: nfs-pvc
EOF
4. Flink SQL作業提交
# 創建Flink SQL作業配置
cat <<EOF | kubectl apply -f -
apiVersion: batch/v1
kind: Job
metadata:
name: flink-sql-job
namespace: lakehouse
spec:
template:
spec:
containers:
- name: flink-job
image: flink:1.18.0-scala_2.12-java11
command: ["/bin/sh", "-c"]
args:
- |
./bin/flink run \
-d \
-m flink-jobmanager:8081 \
-p 3 \
-ynm LakehouseJob \
-yD containerized.master.env.ENV_KAFKA_BOOTSTRAP="kafka-service:9092" \
-yD containerized.taskmanager.env.ENV_KAFKA_BOOTSTRAP="kafka-service:9092" \
-yD containerized.master.env.ENV_PAIMON_WAREHOUSE="/flink/paimon" \
-yD containerized.taskmanager.env.ENV_PAIMON_WAREHOUSE="/flink/paimon" \
./examples/sql/StreamSQLJob.jar \
--sql-file /flink/sql/lakehouse.sql
volumeMounts:
- name: flink-sql
mountPath: /flink/sql
- name: flink-storage
mountPath: /flink
restartPolicy: OnFailure
volumes:
- name: flink-sql
configMap:
name: flink-sql-config
- name: flink-storage
persistentVolumeClaim:
claimName: nfs-pvc
---
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-sql-config
namespace: lakehouse
data:
lakehouse.sql: |
-- 創建Kafka表
CREATE TABLE orders (
order_id STRING,
customer_id STRING,
product_id STRING,
quantity INT,
unit_price DECIMAL(10,2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = '\${ENV_KAFKA_BOOTSTRAP}',
'properties.group.id' = 'flink-consumer',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
-- 創建Paimon表
CREATE TABLE orders_paimon (
order_id STRING,
customer_id STRING,
product_id STRING,
quantity INT,
unit_price DECIMAL(10,2),
order_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'paimon',
'warehouse' = 'file://\${ENV_PAIMON_WAREHOUSE}',
'table' = 'orders',
'file.format' = 'orc',
'write.buffer-size' = '256MB',
'write.buffer-spillable' = 'true',
'write-only' = 'false',
'merge-engine' = 'deduplicate',
'changelog-producer' = 'input',
'full-compaction.delta-commits' = '5'
);
-- 數據處理和寫入
INSERT INTO orders_paimon
SELECT
order_id,
customer_id,
product_id,
quantity,
unit_price,
order_time
FROM orders;
EOF
六、數據湖Paimon集成
1. Paimon配置
# 創建Paimon配置文件
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
name: paimon-config
namespace: lakehouse
data:
catalog.properties: |
metastore=hive
uri=thrift://hive-metastore:9083
warehouse=/flink/paimon
table.properties: |
file.format=orc
write.buffer-size=256MB
write.buffer-spillable=true
write-only=false
merge-engine=deduplicate
changelog-producer=input
full-compaction.delta-commits=5
snapshot.num-retain.min=10
snapshot.num-retain.max=100
snapshot.time-retain=7d
bucket=4
EOF
2. Hive Metastore部署
# 部署Hive Metastore
cat <<EOF | kubectl apply -f -
apiVersion: apps/v1
kind: Deployment
metadata:
name: hive-metastore
namespace: lakehouse
spec:
replicas: 1
selector:
matchLabels:
app: hive-metastore
template:
metadata:
labels:
app: hive-metastore
spec:
containers:
- name: metastore
image: apache/hive:3.1.3
env:
- name: SERVICE_NAME
value: metastore
- name: DB_DRIVER
value: org.postgresql.Driver
- name: DB_URL
value: jdbc:postgresql://postgres:5432/metastore
- name: DB_USER
value: hive
- name: DB_PASSWORD
value: hive
ports:
- containerPort: 9083
volumeMounts:
- name: hive-config
mountPath: /opt/hive/conf
volumes:
- name: hive-config
configMap:
name: hive-config
---
apiVersion: v1
kind: Service
metadata:
name: hive-metastore
namespace: lakehouse
spec:
ports:
- port: 9083
targetPort: 9083
selector:
app: hive-metastore
---
apiVersion: v1
kind: ConfigMap
metadata:
name: hive-config
namespace: lakehouse
data:
hive-site.xml: |
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:postgresql://postgres:5432/metastore</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>org.postgresql.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hive</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/flink/paimon</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://hive-metastore:9083</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
</configuration>
EOF
3. PostgreSQL部署(Hive Metastore后端)
# 部署PostgreSQL
cat <<EOF | kubectl apply -f -
apiVersion: apps/v1
kind: Deployment
metadata:
name: postgres
namespace: lakehouse
spec:
replicas: 1
selector:
matchLabels:
app: postgres
template:
metadata:
labels:
app: postgres
spec:
containers:
- name: postgres
image: postgres:13
env:
- name: POSTGRES_DB
value: metastore
- name: POSTGRES_USER
value: hive
- name: POSTGRES_PASSWORD
value: hive
ports:
- containerPort: 5432
volumeMounts:
- name: postgres-storage
mountPath: /var/lib/postgresql/data
volumes:
- name: postgres-storage
persistentVolumeClaim:
claimName: postgres-pvc
---
apiVersion: v1
kind: Service
metadata:
name: postgres
namespace: lakehouse
spec:
ports:
- port: 5432
selector:
app: postgres
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: postgres-pvc
namespace: lakehouse
spec:
accessModes:
- ReadWriteOnce
storageClassName: nfs-storage
resources:
requests:
storage: 20Gi
EOF
七、數據倉庫Doris部署
1. Doris FE部署
# 創建Doris FE配置
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
name: doris-fe-config
namespace: lakehouse
data:
fe.conf: |
meta_dir=/opt/doris/doris-meta
http_port=8030
rpc_port=9020
query_port=9030
edit_log_port=9010
mysql_service_nic_enabled=true
enable_fqdn_mode=false
priority_networks=10.244.0.0/16
cluster_name=doris-cluster
metadata_failure_recovery=true
heartbeat_timeout_second=15
elect_num=3
ignore_meta_check=false
master_ip=10.244.0.10
master_port=9010
quorum_peer_limit=5
txn_rollback_limit=100
max_consumer_num_per_group=3
async_checkpoint_task_thread_num=10
metadata_checkpoint_interval_second=60
edit_log_roll_num=50000
sys_log_dir=/opt/doris/log
sys_log_roll_mode=SIZE-MAX-AGE
sys_log_roll_num_mb=1024
sys_log_roll_interval=DAY
sys_log_verbose_modules=*
log_level=INFO
audit_log_dir=/opt/doris/log/audit
audit_log_modules=slow_query,query
audit_log_roll_num_mb=1024
audit_log_roll_interval=DAY
qe_slow_log_ms=5000
EOF
# 創建Doris FE服務
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Service
metadata:
name: doris-fe-service
namespace: lakehouse
labels:
app: doris-fe
spec:
ports:
- name: http
port: 8030
targetPort: 8030
- name: rpc
port: 9020
targetPort: 9020
- name: query
port: 9030
targetPort: 9030
- name: edit-log
port: 9010
targetPort: 9010
clusterIP: None
selector:
app: doris-fe
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: doris-fe
namespace: lakehouse
spec:
serviceName: doris-fe-service
replicas: 3
selector:
matchLabels:
app: doris-fe
template:
metadata:
labels:
app: doris-fe
spec:
containers:
- name: fe
image: apache/doris:2.1.6-fe-x86_64
ports:
- containerPort: 8030
- containerPort: 9020
- containerPort: 9030
- containerPort: 9010
env:
- name: FE_ROLE
value: "observer"
volumeMounts:
- name: doris-fe-config
mountPath: /opt/doris/conf/fe.conf
subPath: fe.conf
- name: doris-fe-meta
mountPath: /opt/doris/doris-meta
- name: doris-fe-log
mountPath: /opt/doris/log
readinessProbe:
exec:
command:
- /bin/bash
- -c
- "mysql -h127.0.0.1 -P9030 -uroot -e 'SELECT 1'"
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
volumes:
- name: doris-fe-config
configMap:
name: doris-fe-config
volumeClaimTemplates:
- metadata:
name: doris-fe-meta
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: nfs-storage
resources:
requests:
storage: 10Gi
- metadata:
name: doris-fe-log
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: nfs-storage
resources:
requests:
storage: 5Gi
EOF
2. Doris BE部署
# 創建Doris BE配置
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
name: doris-be-config
namespace: lakehouse
data:
be.conf: |
be_port=9060
webserver_port=8040
heartbeat_service_port=9050
brpc_port=8060
storage_root_path=/opt/doris/storage
sys_log_dir=/opt/doris/log
sys_log_roll_mode=SIZE-MAX-AGE
sys_log_roll_num_mb=1024
sys_log_roll_interval=DAY
sys_log_verbose_modules=*
log_level=INFO
be_http_port=8040
be_rpc_port=9060
brpc_port=8060
heartbeat_service_port=9050
be_socket_timeout_second=120
be_thrift_client_timeout_second=60
max_consumer_num_per_group=3
async_checkpoint_task_thread_num=10
metadata_checkpoint_interval_second=60
edit_log_roll_num=50000
priority_networks=10.244.0.0/16
storage_root_path=/opt/doris/storage
max_sys_log_file_num=100
max_audit_log_file_num=100
qe_slow_log_ms=5000
EOF
# 創建Doris BE服務
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Service
metadata:
name: doris-be-service
namespace: lakehouse
labels:
app: doris-be
spec:
ports:
- name: be
port: 9060
targetPort: 9060
- name: web
port: 8040
targetPort: 8040
- name: heartbeat
port: 9050
targetPort: 9050
- name: brpc
port: 8060
targetPort: 8060
clusterIP: None
selector:
app: doris-be
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: doris-be
namespace: lakehouse
spec:
serviceName: doris-be-service
replicas: 3
selector:
matchLabels:
app: doris-be
template:
metadata:
labels:
app: doris-be
spec:
containers:
- name: be
image: apache/doris:2.1.6-be-x86_64
ports:
- containerPort: 9060
- containerPort: 8040
- containerPort: 9050
- containerPort: 8060
env:
- name: FE_ADDR
value: "doris-fe-service:9030"
volumeMounts:
- name: doris-be-config
mountPath: /opt/doris/conf/be.conf
subPath: be.conf
- name: doris-be-storage
mountPath: /opt/doris/storage
- name: doris-be-log
mountPath: /opt/doris/log
readinessProbe:
exec:
command:
- /bin/bash
- -c
- "curl -s http://localhost:8040/api/health"
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
volumes:
- name: doris-be-config
configMap:
name: doris-be-config
volumeClaimTemplates:
- metadata:
name: doris-be-storage
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: nfs-storage
resources:
requests:
storage: 100Gi
- metadata:
name: doris-be-log
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: nfs-storage
resources:
requests:
storage: 5Gi
EOF
3. Doris初始化
# 創建Doris初始化作業
cat <<EOF | kubectl apply -f -
apiVersion: batch/v1
kind: Job
metadata:
name: doris-init
namespace: lakehouse
spec:
template:
spec:
containers:
- name: doris-init
image: mysql:8.0
command: ["/bin/sh", "-c"]
args:
- |
until mysql -hdoris-fe-service -P9030 -uroot -e "SELECT 1"; do
echo "Waiting for Doris FE to be ready..."
sleep 5
done
# 創建數據庫
mysql -hdoris-fe-service -P9030 -uroot -e "
CREATE DATABASE IF NOT EXISTS lakehouse;
USE lakehouse;
-- 創建訂單表
CREATE TABLE IF NOT EXISTS orders (
order_id VARCHAR(100),
customer_id VARCHAR(100),
product_id VARCHAR(100),
quantity INT,
unit_price DECIMAL(10,2),
order_time DATETIME,
total_amount DECIMAL(12,2) GENERATED ALWAYS AS (quantity * unit_price) STORED,
PRIMARY KEY (order_id)
) DISTRIBUTED BY HASH(order_id) BUCKETS 10
PROPERTIES (
"replication_num" = "3",
"storage_medium" = "SSD"
);
-- 創建客戶表
CREATE TABLE IF NOT EXISTS customers (
customer_id VARCHAR(100),
customer_name VARCHAR(100),
email VARCHAR(100),
registration_date DATETIME,
PRIMARY KEY (customer_id)
) DISTRIBUTED BY HASH(customer_id) BUCKETS 5
PROPERTIES (
"replication_num" = "3",
"storage_medium" = "SSD"
);
-- 創建產品表
CREATE TABLE IF NOT EXISTS products (
product_id VARCHAR(100),
product_name VARCHAR(200),
category VARCHAR(100),
price DECIMAL(10,2),
PRIMARY KEY (product_id)
) DISTRIBUTED BY HASH(product_id) BUCKETS 5
PROPERTIES (
"replication_num" = "3",
"storage_medium" = "SSD"
);
"
restartPolicy: OnFailure
EOF
八、高可用配置
1. Kubernetes高可用配置
# 創建Kubernetes高可用配置
cat <<EOF | kubectl apply -f -
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: kafka-pdb
namespace: lakehouse
spec:
minAvailable: 2
selector:
matchLabels:
app: kafka
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: zookeeper-pdb
namespace: lakehouse
spec:
minAvailable: 2
selector:
matchLabels:
app: zookeeper
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: doris-fe-pdb
namespace: lakehouse
spec:
minAvailable: 2
selector:
matchLabels:
app: doris-fe
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: doris-be-pdb
namespace: lakehouse
spec:
minAvailable: 2
selector:
matchLabels:
app: doris-be
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: flink-taskmanager-pdb
namespace: lakehouse
spec:
minAvailable: 2
selector:
matchLabels:
app: flink
component: taskmanager
EOF
2. Flink高可用配置
Flink的高可用性已經在之前的配置中通過以下參數實現:
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.cluster-id: flink-cluster
high-availability.storageDir: file:///flink/ha/
3. Doris高可用配置
Doris的高可用性通過以下方式實現:
- FE高可用:部署3個FE節點(1個Master,2個Observer)
- BE高可用:部署3個BE節點,數據副本數為3
- 元數據高可用:使用PostgreSQL作為元數據存儲
- Kafka高可用配置
Kafka的高可用性通過以下方式實現:
- Broker高可用:部署3個Broker節點
- 主題副本:所有主題副本數設置為3
- ISR配置:設置min.insync.replicas=2
九、監控與告警
1. Prometheus部署
# 部署Prometheus
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm install prometheus prometheus-community/kube-prometheus-stack \
--namespace monitoring \
--set prometheus.prometheusSpec.serviceMonitorSelectorNilUsesHelmValues=false \
--set grafana.adminPassword=admin123 \
--set grafana.service.type=NodePort \
--set grafana.service.nodePort=30080
2. 監控配置
# 創建Kafka監控配置
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-metrics
namespace: lakehouse
data:
kafka-metrics.yml: |
lowercaseOutputName: true
rules:
- pattern: kafka.server<type=(.+), name=(.+)PerSec\w*, ><>Value
name: kafka_server_$1_$2
type: GAUGE
- pattern: kafka.server<type=(.+), name=(.+), (.+)=(.+)><>Value
name: kafka_server_$1_$2
labels:
$3: "$4"
type: GAUGE
EOF
# 創建Kafka ServiceMonitor
cat <<EOF | kubectl apply -f -
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: kafka
namespace: lakehouse
labels:
app: kafka
spec:
selector:
matchLabels:
app: kafka
endpoints:
- port: kafka
interval: 30s
path: /metrics
scheme: http
metricRelabelings:
- sourceLabels: [__name__]
regex: 'kafka_server_(.+)'
targetLabel: __name__
replacement: 'kafka_$1'
EOF
# 創建Flink ServiceMonitor
cat <<EOF | kubectl apply -f -
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: flink
namespace: lakehouse
labels:
app: flink
spec:
selector:
matchLabels:
app: flink
endpoints:
- port: prom
interval: 30s
path: /metrics
scheme: http
EOF
# 創建Doris ServiceMonitor
cat <<EOF | kubectl apply -f -
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: doris
namespace: lakehouse
labels:
app: doris
spec:
selector:
matchLabels:
app: doris
endpoints:
- port: web
interval: 30s
path: /metrics
scheme: http
EOF
3. 告警規則
# 創建告警規則
cat <<EOF | kubectl apply -f -
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: lakehouse-alerts
namespace: monitoring
spec:
groups:
- name: lakehouse.rules
rules:
- alert: KafkaDown
expr: up{job="kafka"} == 0
for: 5m
labels:
severity: critical
annotations:
summary: "Kafka instance down"
description: "Kafka instance {{ \$labels.instance }} has been down for more than 5 minutes"
- alert: FlinkJobManagerDown
expr: up{job="flink", component="jobmanager"} == 0
for: 2m
labels:
severity: critical
annotations:
summary: "Flink JobManager down"
description: "Flink JobManager {{ \$labels.instance }} has been down for more than 2 minutes"
- alert: DorisFeDown
expr: up{job="doris", component="fe"} == 0
for: 2m
labels:
severity: critical
annotations:
summary: "Doris FE down"
description: "Doris FE {{ \$labels.instance }} has been down for more than 2 minutes"
- alert: DorisBeDown
expr: up{job="doris", component="be"} == 0
for: 2m
labels:
severity: critical
annotations:
summary: "Doris BE down"
description: "Doris BE {{ \$labels.instance }} has been down for more than 2 minutes"
- alert: HighKafkaConsumerLag
expr: kafka_consumergroup_lag > 10000
for: 10m
labels:
severity: warning
annotations:
summary: "High Kafka consumer lag"
description: "Kafka consumer group {{ \$labels.consumergroup }} has lag of {{ \$value }} messages"
EOF
十、數據流轉驗證
1. 數據生產者模擬
# 創建Kafka數據生產者
cat <<EOF | kubectl apply -f -
apiVersion: batch/v1
kind: Job
metadata:
name: kafka-producer
namespace: lakehouse
spec:
template:
spec:
containers:
- name: producer
image: confluentinc/cp-kafka:7.3.0
command: ["/bin/sh", "-c"]
args:
- |
# 生成訂單數據
for i in {1..1000}; do
order_id="order-$(date +%s%N | cut -c1-13)-$i"
customer_id="customer-$(($RANDOM % 100))"
product_id="product-$(($RANDOM % 50))"
quantity=$(($RANDOM % 10 + 1))
unit_price=$(echo "scale=2; $RANDOM/100 + 10" | bc)
order_time=$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")
echo "{\"order_id\":\"$order_id\",\"customer_id\":\"$customer_id\",\"product_id\":\"$product_id\",\"quantity\":$quantity,\"unit_price\":$unit_price,\"order_time\":\"$order_time\"}" | \
kafka-console-producer --broker-list kafka-service:9092 --topic orders
sleep 0.1
done
restartPolicy: Never
EOF
2. 數據驗證
# 驗證Paimon中的數據
kubectl exec -it flink-taskmanager-0 -n lakehouse -- /bin/bash
# 在Flink TaskManager中執行
./bin/flink run \
-d \
-m flink-jobmanager:8081 \
-p 1 \
-ynm PaimonValidation \
./examples/sql/StreamSQLJob.jar \
--sql-file /flink/sql/validate.sql
# 創建驗證SQL配置
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-validate-config
namespace: lakehouse
data:
validate.sql: |
-- 查詢Paimon表中的數據
CREATE TABLE paimon_orders (
order_id STRING,
customer_id STRING,
product_id STRING,
quantity INT,
unit_price DECIMAL(10,2),
order_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'paimon',
'warehouse' = 'file:///flink/paimon',
'table' = 'orders',
'scan.snapshot-id' = 'latest'
);
-- 打印查詢結果
SELECT * FROM paimon_orders LIMIT 10;
EOF
3. Doris數據同步
# 創建Doris數據同步作業
cat <<EOF | kubectl apply -f -
apiVersion: batch/v1
kind: Job
metadata:
name: doris-sync
namespace: lakehouse
spec:
template:
spec:
containers:
- name: sync
image: mysql:8.0
command: ["/bin/sh", "-c"]
args:
- |
# 等待Paimon數據生成
sleep 60
# 從Paimon讀取數據并寫入Doris
mysql -hdoris-fe-service -P9030 -uroot -e "
USE lakehouse;
-- 創建臨時表
CREATE TABLE temp_orders LIKE orders;
-- 從Paimon加載數據到Doris
LOAD LABEL lakehouse.orders_paimon
(
DATA INFILE('/flink/paimon/orders/data/*.orc')
INTO TABLE temp_orders
FORMAT AS 'orc'
(order_id, customer_id, product_id, quantity, unit_price, order_time)
SET (total_amount = quantity * unit_price)
)
WITH BROKER 'broker_name'
PROPERTIES (
'timeout' = '3600',
'max_filter_ratio' = '0.1'
);
-- 合并數據到主表
INSERT INTO orders
SELECT * FROM temp_orders
ON DUPLICATE KEY UPDATE
customer_id = VALUES(customer_id),
product_id = VALUES(product_id),
quantity = VALUES(quantity),
unit_price = VALUES(unit_price),
order_time = VALUES(order_time),
total_amount = VALUES(total_amount);
-- 清理臨時表
DROP TABLE temp_orders;
"
restartPolicy: OnFailure
EOF
4. 數據驗證
# 驗證Doris中的數據
kubectl exec -it doris-fe-0 -n lakehouse -- /bin/bash
# 在Doris FE中執行
mysql -h127.0.0.1 -P9030 -uroot -e "
USE lakehouse;
SELECT COUNT(*) FROM orders;
SELECT * FROM orders ORDER BY order_time DESC LIMIT 10;
"
十一、性能優化建議
1. Kafka性能優化
# Kafka優化配置
env:
-name:KAFKA_HEAP_OPTS
value:"-Xmx4G -Xms4G"
-name:KAFKA_JVM_PERFORMANCE_OPTS
value:"-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
-name:KAFKA_LOG_RETENTION_HOURS
value:"168"
-name:KAFKA_LOG_SEGMENT_BYTES
value:"1073741824"
-name:KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS
value:"300000"
-name:KAFKA_NUM_PARTITIONS
value:"3"
-name:KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR
value:"4"
-name:KAFKA_SOCKET_SEND_BUFFER_BYTES
value:"1024000"
-name:KAFKA_SOCKET_RECEIVE_BUFFER_BYTES
value:"1024000"
-name:KAFKA_SOCKET_REQUEST_MAX_BYTES
value:"104857600"
-name:KAFKA_LOG_FLUSH_INTERVAL_MESSAGES
value:"10000"
-name:KAFKA_LOG_FLUSH_INTERVAL_MS
value: "1000"
2. Flink性能優化
# Flink優化配置
env:
-name:FLINK_JM_HEAP
value:"4096m"
-name:FLINK_TM_HEAP
value:"8192m"
-name:FLINK_TM_OFFHEAP
value:"2048m"
-name:TASK_MANAGER_NUMBER_OF_TASK_SLOTS
value:"8"
-name:PARALLELISM_DEFAULT
value:"12"
-name:REST_FLAMEGRAPH_ENABLED
value: "true"
3. Doris性能優化
-- Doris表優化
ALTER TABLE orders SET ("replication_num" = "3", "storage_medium" = "SSD", "storage_cooldown_time" = "2025-01-01 00:00:00");
ALTER TABLE customers SET ("replication_num" = "3", "storage_medium" = "SSD", "storage_cooldown_time" = "2025-01-01 00:00:00");
ALTER TABLE products SET ("replication_num" = "3", "storage_medium" = "SSD", "storage_cooldown_time" = "2025-01-01 00:00:00");
-- 創建物化視圖
CREATE MATERIALIZED VIEW mv_customer_orders
DISTRIBUTED BY HASH(customer_id) BUCKETS 10
AS
SELECT
customer_id,
COUNT(*) AS order_count,
SUM(total_amount) AS total_spent,
AVG(total_amount) AS avg_order_value,
MAX(order_time) AS last_order_date
FROM orders
GROUPBY customer_id;
-- 創建Rollup
ALTER TABLE orders ADDROLLUP orders_rollup (customer_id, product_id, order_time, total_amount);
4. Paimon性能優化
-- Paimon表優化
ALTER TABLE orders_paimon SET (
'write.buffer-size'='512MB',
'write.buffer-spillable'='true',
'write-only'='false',
'merge-engine'='deduplicate',
'changelog-producer'='input',
'full-compaction.delta-commits'='10',
'bucket'='8',
'file.format'='orc',
'orc.compress'='zlib',
'orc.compress.size'='262144',
'orc stripe.size'='67108864',
'orc.row.index.stride'='10000'
);
十二、故障恢復演練
1. Kafka節點故障模擬
# 模擬Kafka節點故障
kubectl scale statefulsets kafka --replicas=2 -n lakehouse
# 觀察Kafka狀態
kubectl get pods -l app=kafka -n lakehouse -w
# 恢復節點
kubectl scale statefulsets kafka --replicas=3 -n lakehouse
2. Flink JobManager故障模擬
# 模擬Flink JobManager故障
kubectl delete pod flink-jobmanager-0 -n lakehouse
# 觀察Flink狀態
kubectl get pods -l app=flink,component=jobmanager -n lakehouse -w
# 檢查作業狀態
kubectl logs flink-taskmanager-0 -n lakehouse | grep "JobManager"
3. Doris FE故障模擬
# 模擬Doris FE故障
kubectl delete pod doris-fe-0 -n lakehouse
# 觀察Doris FE狀態
kubectl get pods -l app=doris-fe -n lakehouse -w
# 檢查FE角色變更
kubectl exec -it doris-fe-1 -n lakehouse -- mysql -h127.0.0.1 -P9030 -uroot -e "SHOW FRONTENDS;"
4. 數據一致性驗證
# 創建數據一致性驗證作業
cat <<EOF | kubectl apply -f -
apiVersion: batch/v1
kind: Job
metadata:
name: data-consistency-check
namespace: lakehouse
spec:
template:
spec:
containers:
- name: checker
image: mysql:8.0
command: ["/bin/sh", "-c"]
args:
- |
# 檢查Paimon和Doris中的數據一致性
paimon_count=$(mysql -hdoris-fe-service -P9030 -uroot -e "SELECT COUNT(*) FROM lakehouse.orders;" -s -N)
doris_count=$(mysql -hdoris-fe-service -P9030 -uroot -e "SELECT COUNT(*) FROM lakehouse.orders;" -s -N)
if [ "$paimon_count" -eq "$doris_count" ]; then
echo "Data consistency check passed: Paimon count = $paimon_count, Doris count = $doris_count"
else
echo "Data inconsistency detected: Paimon count = $paimon_count, Doris count = $doris_count"
exit 1
fi
restartPolicy: OnFailure
EOF
十三、總結
本指南詳細介紹了如何在Kubernetes環境中部署一套完整的湖倉一體高可用架構,包括以下關鍵組件:
- Kafka 3.0:作為分布式消息隊列,實現高吞吐量的數據接入和流轉
- Flink 1.18:提供強大的實時流處理能力,支持 Exactly-Once 語義
- Paimon 0.7:作為開放數據湖存儲格式,提供統一的湖存儲層
- Doris 2.1.6:作為高性能MPP分析型數據庫,提供快速查詢能力
- Kubernetes:作為容器編排平臺,提供資源管理、自動伸縮和高可用保障
1. 架構優勢
- 高可用性:所有組件均采用多副本部署,避免單點故障
- 彈性擴展:基于Kubernetes的自動伸縮能力,根據負載動態調整資源
- 數據一致性:通過事務機制和副本同步確保數據一致性
- 統一元數據:使用Hive Metastore作為統一元數據管理中心
- 資源隔離:通過Kubernetes命名空間實現多租戶資源隔離
2. 關鍵配置要點
(1) Kafka:
- 3節點Broker集群
- 主題副本數設置為3
- ISR配置為2
(2) Flink:
- JobManager高可用配置
- Checkpoint機制
- RocksDB狀態后端
(3) Paimon:
- ORC文件格式
- 合并引擎配置
- 增量快照管理
(4) Doris:
- 3節點FE集群(1主2備)
- 3節點BE集群
- 數據副本數設置為3
2. 運維建議
- 監控:部署Prometheus和Grafana實現全面監控
- 告警:配置關鍵指標告警規則
- 備份:定期備份元數據和關鍵配置
- 演練:定期進行故障恢復演練
- 優化:根據實際負載持續優化性能參數
通過本指南的部署方案,可以構建一個穩定、高效、可擴展的湖倉一體高可用架構,滿足現代數據平臺的各種需求。