Я уже написал статьюПочему MQTT предпочтителен для интеллектуального оборудования - Nuggets,На этот раз я создам свою собственную платформу взаимодействия MQTT.,Испытайте это на самом деле,Как это можно сделать без реального боя?
Платформа, которую я здесь использую,EMQX Cloud,Вы можете бесплатно подать заявку на устройство MQTT Служить через свою учетную запись github.,Особенно удобно для частных лиц.,в то же времяиспользоватьиспользовать MQTT Быстрый тест клиента MQTT СлужитьДля мониторинга или имитации доставки,Здесь мы решили открыть его бесплатно,Нажмите «Развернуть сейчас» и согласитесь на его создание.
После того, как он установлен, нажимаем «Управление проектами», и появится сервер, на который мы только что подали заявку. После входа нажмите «Пуск», чтобы мы могли запустить службу.
Нажмите «Аутентификация», выберите «Аутентификация», а затем нажмите «Добавить» справа, чтобы создать пользователя для подключения. Имя и пароль этого пользователя — это имя пользователя и пароль, которые понадобятся нашему клиенту при установлении соединения. На этом этапе мы можем перейти к клиенту и написать код подключения.
dependencies {
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.4'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<application
...
<service android:name="org.eclipse.paho.android.service.MqttService" />
</application>
private static MqttAndroidClient mqttAndroidClient;
private static String mqttUsername = ""; //Имя пользователя, созданное Служить
private static String mqttPassword = ""; //Имя пользователя и пароль созданы СлужитьDuanba
private static String clientId = ""; //Уникальный идентификатор не может повторяться
//Получаем очередь сообщения
public static final LinkedBlockingQueue<MyMessage> SERVER_QUEUE = new LinkedBlockingQueue<>(
200);
//Тема подписки на сообщения может быть настроена
private static final String topic = "/" + mqttUsername + "/" + clientId + "/user/get";
public static void initIot() {
String serverUrl = "Служить адрес:порт";
try {
mqttAndroidClient = new MqttAndroidClient(context, serverUrl, "clientId");
mqttAndroidClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
Log.i(TAG,"соединятьотключиться"); }
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.i(TAG, «Получено сообщение:» + message.toString());
//предположениеиспользоватьочередьперенимать MyMessage myMessage = new MyMessage();
myMessage.setData(message.getPayload());
boolean offer = SERVER_QUEUE.offer(aMessage);
if (!offer) {
Log.e(TAG, "очередь заполнена и не может принимать сообщения!");
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
Log.i(TAG, "deliveryComplete: " + token.toString());
}
});
//Создаем правила соединения
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttUsername);
options.setPassword(mqttPassword.toCharArray());
options.setCleanSession(true);
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //MQTT-версия
options.setConnectionTimeout(10); //соединять таймаут
options.setKeepAliveInterval(180); //Интервал пульса
options.setMaxInflight(100); //Максимальное количество запросов, по умолчанию 10, это значение можно увеличить в сценариях с высоким трафиком
options.setAutomaticReconnect(true); //Настраиваем автоматический перезапуск соединения
mqttAndroidClient.connect(options, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.i(TAG,"соединятьуспех"); //здесь Подписаться на новости
subscribe();
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.i(TAG,"соединятьнеудача" + exception);
}
});
} catch (Exception e) {
Log.e(TAG, "INIT IOT ERROR!");
}
}
public class MyMessage {
public Object data;
public MyMessage() {
}
public MyMessage(Object data) {
this.data = data;
}
public Object getData() {
return this.data;
}
public void setData(Object data) {
this.data = data;
}
}
private static void subscribe() {
try {
mqttAndroidClient.subscribe(topic, 1, null,
new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.i(TAG,
"Подписка прошла успешно topic: "
+ topic);
}
@Override
public void onFailure(IMqttToken asyncActionToken,
Throwable exception) {
Log.e(TAG, «Подписка не удалась!» + exception.getMessage());
}
});
} catch (Exception e) {
Log.e(TAG, «Подписка не удалась!» + e.getMessage());
}
}
//Сообщение отправлено в очередь
public static final LinkedBlockingQueue<String> CLIENT_QUEUE = new LinkedBlockingQueue<>(1000);
//Опубликовать сообщение Вызовите этот метод
public static void putQueue(String msg) {
boolean offer = CLIENT_QUEUE.offer(msg);
if (!offer) {
Log.w(TAG, "Операционная очередь заполнена!");
}
}
//использовать поток для чтения, это может предотвратить несколько вызовов одновременно, и в то же время событие отправки не будет потеряно.
static class IotPublishRunnable implements Runnable {
@Override
public void run() {
while (true) {
try {
String msg = CLIENT_QUEUE.take();
if (TextUtils.isEmpty(msg)) {
continue;
}
publish(msg);
Thread.sleep(300);
} catch (Exception e) {
Log.e(TAG, «Не удалось обработать сообщение IOT»);
}
}
}
}
private static void publishNew(String payload) {
String topic = "/" + mqttUsername + "/" + clientId + "/user/update";
Integer qos = 1;
try {
if (null == mqttAndroidClient || !mqttAndroidClient.isConnected()) {
Log.w(TAG, «IOT еще не инициализирован! Невозможно отправить сообщение»);
return;
}
mqttAndroidClient.publish(topic, payload.getBytes(StandardCharsets.UTF_8), qos, false,
null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
String[] topics = asyncActionToken.getTopics();
Log.e(TAG, "publish message error! topics: " + Arrays.toString(topics));
}
});
} catch (MqttException e) {
Log.e(TAG, «Не удалось отправить сообщение!»);
} catch (IllegalArgumentException e) {
Log.e(TAG, "MQTT CLIENT ERROR");
}
}
public static void disconnect() {
if (null == mqttAndroidClient || !mqttAndroidClient.isConnected()) {
Log.w(TAG, «IOT еще не инициализирован!»);
return;
}
try {
mqttAndroidClient.disconnect().setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.i(TAG, "Отключить соединение успешно!");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.i(TAG, "Не удалось отключить соединение!");
}
});
} catch (MqttException e) {
Log.e(TAG, e.getMessage());
}
}
Выше приведен код MQTT клиента.,Я написал это на Java,Kotlin版的предположение参考Android использовать Kotlin соединять MQTT,Код в основном здесь,Я не отпущу проекты или что-то в этом роде.