针对每秒 1 万 QPS 持续存储的需求,尤其是涉及到像万达广场这样的大型分布式系统,设计一个高效、可靠的数据收集和存储方案至关重要。
以下是基于 MQTT 和 HBase 的整体架构思路:
数据收集和传输
MQTT(消息队列遥测传输协议):
适用场景:MQTT 适用于大量设备数据的低带宽、高延迟和不稳定网络连接环境。
特点:轻量级协议,支持发布/订阅模式,提供 QoS(服务质量)级别保证消息传输的可靠性。
具体应用:每个服务器(设备)作为 MQTT 客户端,定期将数据上报给 MQTT 代理(如 Mosquitto 或 EMQX)。
Kafka(可选):
适用场景:处理高吞吐量的数据流传输,具有持久化功能,适合大规模实时数据处理。
特点:分布式、高吞吐、横向扩展。
具体应用:MQTT 代理接收到的数据通过桥接器(如 Kafka Connect)转发到 Kafka 主题,实现数据的持久化和缓冲。
数据存储
HBase(Hadoop Database):
适用场景:高写入吞吐量和随机读取场景,适合大规模结构化数据存储。
特点:基于列存储,支持实时读写操作,具有高扩展性和容错性。
具体应用:Kafka 中的数据通过 HBase Sink Connector 或自定义消费者存储到 HBase 中。
Elasticsearch(可选):
适用场景:全文搜索和实时分析,适合日志、监控数据存储和查询。
特点:分布式搜索引擎,提供强大的搜索和分析功能。
具体应用:数据同步到 Elasticsearch,方便快速查询和分析。
数据处理和分析
Spark Streaming:
适用场景:实时数据流处理,适合大规模数据处理和分析。
特点:分布式计算引擎,支持批处理和流处理。
具体应用:实时处理 Kafka 中的数据流,进行数据清洗、聚合等操作后存储到 HBase 或 Elasticsearch。
Flink:
适用场景:高性能、低延迟的实时数据处理。
特点:分布式流处理框架,支持事件时间处理和复杂事件处理。
具体应用:与 Spark 类似,用于实时数据流的处理和分析。
监控和运维
Prometheus + Grafana:
适用场景:实时监控和告警,适合大规模分布式系统监控。
特点:强大的数据收集和告警系统,结合 Grafana 提供丰富的可视化界面。
具体应用:监控各个组件(MQTT 代理、Kafka、HBase 等)的性能和状态,确保系统的高可用性。
总体架构示意图
数据收集:
各服务器作为 MQTT 客户端,定期将数据发布到 MQTT 代理。
MQTT 代理接收到的数据通过桥接器转发到 Kafka。
数据传输和缓冲:
Kafka 作为消息队列,实现数据的持久化和缓冲,支持高吞吐量的数据传输。
数据存储:
Kafka 中的数据通过 HBase Sink Connector 存储到 HBase。
可选的 Elasticsearch 同步,用于快速查询和分析。
数据处理和分析:
使用 Spark Streaming 或 Flink 进行实时数据处理和分析。
监控和运维:
使用 Prometheus 监控系统性能,Grafana 可视化数据。
示例代码
MQTT 客户端(数据发布)
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttPublisher {
public static void main(String[] args) {
String broker = "tcp://mqtt-broker:1883";
String clientId = "ServerClientId";
try {
MqttClient client = new MqttClient(broker, clientId);
client.connect();
MqttMessage message = new MqttMessage("Your data".getBytes());
message.setQos(1);
client.publish("your/topic", message);
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
Kafka Consumer(数据存储到 HBase)
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import java.util.Collections;
import java.util.Properties;
public class KafkaToHBaseConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker:9092");
props.put("group.id", "group_id");
props.put("enable.auto.commit", "true");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("your_topic"));
try (Connection connection = ConnectionFactory.createConnection()) {
Table table = connection.getTable(TableName.valueOf("your_table"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
Put put = new Put(record.key().getBytes());
put.addColumn("cf".getBytes(), "data".getBytes(), record.value().getBytes());
table.put(put);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
总结
上述方案利用 MQTT 进行数据收集,通过 Kafka 进行数据缓冲和持久化,最终存储到 HBase(或 Elasticsearch)中进行数据查询和分析。
Spark Streaming 或 Flink 可以进一步处理和分析实时数据,同时通过 Prometheus 和 Grafana 进行监控,确保系统的高可用性和稳定性。