組閤 MQTT 客戶端與 Web 套接字連接。
WebSockets MQTT 訂閱 展示如何設計自定義 QIODevice 以組閤 web socket 連接采用 QMqttClient .
注意: Since Qt 6.10, WebSockets are supported natively by Qt MQTT. This example is retained to demonstrate how to implement a custom transport.
The new custom device,
WebSocketIODevice
, has to be a subclass of
QIODevice
:
class WebSocketIODevice : public QIODevice { Q_OBJECT public: WebSocketIODevice(QObject *parent = nullptr); bool isSequential() const override; qint64 bytesAvailable() const override; bool open(OpenMode mode) override; void close() override; qint64 readData(char *data, qint64 maxlen) override; qint64 writeData(const char *data, qint64 len) override; void setUrl(const QUrl &url); void setProtocol(const QByteArray &data); Q_SIGNALS: void socketConnected(); public slots: void handleBinaryMessage(const QByteArray &msg); void onSocketConnected(); private: QByteArray m_protocol; QByteArray m_buffer; QWebSocket m_socket; QUrl m_url; };
WebSocketIODevice
will be a private member of the
ClientSubscription
class alongside the
QMqttClient
和
QMqttSubscription
:
private: QMqttClient m_client; QMqttSubscription *m_subscription; QUrl m_url; QString m_topic; WebSocketIODevice m_device; int m_version;
The main logic is implemented in the
connectAndSubscribe()
方法在
ClientSubscription
class. You need to verify that the web socket has successfully connected before you can initialize an MQTT connection over it. After the MQTT connection has been established, the
QMqttClient
can subscribe to the topic. If the subscription is successful, the
QMqttSubscription
can be used to receive messages from the subscribed topic that will be handled by the
handleMessage()
方法在
ClientSubscription
類。
void ClientSubscription::connectAndSubscribe() { qCDebug(lcWebSocketMqtt) << "Connecting to broker at " << m_url; m_device.setUrl(m_url); m_device.setProtocol(m_version == 3 ? "mqttv3.1" : "mqtt"); connect(&m_device, &WebSocketIODevice::socketConnected, this, [this]() { qCDebug(lcWebSocketMqtt) << "WebSocket connected, initializing MQTT connection."; m_client.setProtocolVersion(m_version == 3 ? QMqttClient::MQTT_3_1 : QMqttClient::MQTT_3_1_1); m_client.setTransport(&m_device, QMqttClient::IODevice); connect(&m_client, &QMqttClient::connected, this, [this]() { qCDebug(lcWebSocketMqtt) << "MQTT connection established"; m_subscription = m_client.subscribe(m_topic); if (!m_subscription) { qDebug() << "Failed to subscribe to " << m_topic; emit errorOccured(); } connect(m_subscription, &QMqttSubscription::stateChanged, [](QMqttSubscription::SubscriptionState s) { qCDebug(lcWebSocketMqtt) << "Subscription state changed:" << s; }); connect(m_subscription, &QMqttSubscription::messageReceived, [this](QMqttMessage msg) { handleMessage(msg.payload()); }); }); m_client.connectToHost(); }); if (!m_device.open(QIODevice::ReadWrite)) qDebug() << "Could not open socket device"; }
文件: