侧边栏壁纸
博主头像
月伴飞鱼 博主等级

行动起来,活在当下

  • 累计撰写 126 篇文章
  • 累计创建 31 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

1万每秒持续的QPS用什么技术存储比较好?

月伴飞鱼
2025-04-23 / 0 评论 / 1 点赞 / 3 阅读 / 0 字
温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

针对每秒 1 万 QPS 持续存储的需求,尤其是涉及到像万达广场这样的大型分布式系统,设计一个高效、可靠的数据收集和存储方案至关重要。

以下是基于 MQTT 和 HBase 的整体架构思路:

数据收集和传输

  1. MQTT(消息队列遥测传输协议)

    • 适用场景:MQTT 适用于大量设备数据的低带宽、高延迟和不稳定网络连接环境。

    • 特点:轻量级协议,支持发布/订阅模式,提供 QoS(服务质量)级别保证消息传输的可靠性。

    • 具体应用:每个服务器(设备)作为 MQTT 客户端,定期将数据上报给 MQTT 代理(如 Mosquitto 或 EMQX)。

  2. Kafka(可选)

    • 适用场景:处理高吞吐量的数据流传输,具有持久化功能,适合大规模实时数据处理。

    • 特点:分布式、高吞吐、横向扩展。

    • 具体应用:MQTT 代理接收到的数据通过桥接器(如 Kafka Connect)转发到 Kafka 主题,实现数据的持久化和缓冲。

数据存储

  1. HBase(Hadoop Database)

    • 适用场景:高写入吞吐量和随机读取场景,适合大规模结构化数据存储。

    • 特点:基于列存储,支持实时读写操作,具有高扩展性和容错性。

    • 具体应用:Kafka 中的数据通过 HBase Sink Connector 或自定义消费者存储到 HBase 中。

  2. Elasticsearch(可选)

    • 适用场景:全文搜索和实时分析,适合日志、监控数据存储和查询。

    • 特点:分布式搜索引擎,提供强大的搜索和分析功能。

    • 具体应用:数据同步到 Elasticsearch,方便快速查询和分析。

数据处理和分析

  1. Spark Streaming

    • 适用场景:实时数据流处理,适合大规模数据处理和分析。

    • 特点:分布式计算引擎,支持批处理和流处理。

    • 具体应用:实时处理 Kafka 中的数据流,进行数据清洗、聚合等操作后存储到 HBase 或 Elasticsearch。

  2. Flink

    • 适用场景:高性能、低延迟的实时数据处理。

    • 特点:分布式流处理框架,支持事件时间处理和复杂事件处理。

    • 具体应用:与 Spark 类似,用于实时数据流的处理和分析。

监控和运维

  1. Prometheus + Grafana

    • 适用场景:实时监控和告警,适合大规模分布式系统监控。

    • 特点:强大的数据收集和告警系统,结合 Grafana 提供丰富的可视化界面。

    • 具体应用:监控各个组件(MQTT 代理、Kafka、HBase 等)的性能和状态,确保系统的高可用性。

总体架构示意图

  1. 数据收集

    • 各服务器作为 MQTT 客户端,定期将数据发布到 MQTT 代理。

    • MQTT 代理接收到的数据通过桥接器转发到 Kafka。

  2. 数据传输和缓冲

    • Kafka 作为消息队列,实现数据的持久化和缓冲,支持高吞吐量的数据传输。

  3. 数据存储

    • Kafka 中的数据通过 HBase Sink Connector 存储到 HBase。

    • 可选的 Elasticsearch 同步,用于快速查询和分析。

  4. 数据处理和分析

    • 使用 Spark Streaming 或 Flink 进行实时数据处理和分析。

  5. 监控和运维

    • 使用 Prometheus 监控系统性能,Grafana 可视化数据。

示例代码

MQTT 客户端(数据发布)

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MqttPublisher {
    public static void main(String[] args) {
        String broker = "tcp://mqtt-broker:1883";
        String clientId = "ServerClientId";
        try {
            MqttClient client = new MqttClient(broker, clientId);
            client.connect();
            MqttMessage message = new MqttMessage("Your data".getBytes());
            message.setQos(1);
            client.publish("your/topic", message);
            client.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

Kafka Consumer(数据存储到 HBase)

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;

import java.util.Collections;
import java.util.Properties;

public class KafkaToHBaseConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka-broker:9092");
        props.put("group.id", "group_id");
        props.put("enable.auto.commit", "true");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("your_topic"));

        try (Connection connection = ConnectionFactory.createConnection()) {
            Table table = connection.getTable(TableName.valueOf("your_table"));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    Put put = new Put(record.key().getBytes());
                    put.addColumn("cf".getBytes(), "data".getBytes(), record.value().getBytes());
                    table.put(put);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

总结

上述方案利用 MQTT 进行数据收集,通过 Kafka 进行数据缓冲和持久化,最终存储到 HBase(或 Elasticsearch)中进行数据查询和分析。

Spark Streaming 或 Flink 可以进一步处理和分析实时数据,同时通过 Prometheus 和 Grafana 进行监控,确保系统的高可用性和稳定性。

1
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin
    1. 支付宝打赏

      qrcode alipay
    2. 微信打赏

      qrcode weixin
博主关闭了所有页面的评论