摘要:Java MQTT Client
這是我是從
http://tokudu.com/2010/how-to-implement-push-notifications-for-android/
這篇文章學來,並從他提供的Source,另外抽出來使用,原本是for Android,我從出來變成獨立的console程式,
https://github.com/tokudu/AndroidPushNotificationsDemo
從這裡抄出來之後,並取得他使用的wmqtt.jar
將Android的部分去掉,與獨立化之後。
變成以下內容
MQTTConnection.java
import com.ibm.mqtt.*;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
/**
* Created by IntelliJ IDEA.
* User: TomLai
* Date: 2012/9/18
* Time: 上午 10:23
* To change this template use File | Settings | File Templates.
*/
public class MQTTConnection implements MqttSimpleCallback {
IMqttClient mqttClient = null;
// Let's not use the MQTT persistence.
private static MqttPersistence MQTT_PERSISTENCE = null;
// We don't need to remember any state between the connections, so we use a clean start.
private static boolean MQTT_CLEAN_START = true;
// Let's set the internal keep alive for MQTT to 15 mins. I haven't tested this value much. It could probably be increased.
private static short MQTT_KEEP_ALIVE = 60 * 15;
// MQTT client ID, which is given the broker. In this example, I also use this for the topic header.
// You can use this to run push notifications for multiple apps with one MQTT broker.
public static String MQTT_CLIENT_ID = "tokudu";
// Set quality of services to 0 (at most once delivery), since we don't want push notifications
// arrive more than once. However, this means that some messages might get lost (delivery is not guaranteed)
private static int[] MQTT_QUALITIES_OF_SERVICE = {0};
private static int MQTT_QUALITY_OF_SERVICE = 0;
// The broker should not retain any messages.
private static boolean MQTT_RETAINED_PUBLISH = false;
private long mStartTime;
private String token = "";
public MQTTConnection(String MQTT_HOST,int MQTT_BROKER_PORT_NUM,String token) throws MqttException {
// Create connection spec
String mqttConnSpec = "tcp://" + MQTT_HOST + "@" + MQTT_BROKER_PORT_NUM;
// Create the client and connect
mqttClient = MqttClient.createMqttClient(mqttConnSpec, MQTT_PERSISTENCE);
String clientID = MQTT_CLIENT_ID + "/" + token;
mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE);
// register this client app has being able to receive messages
mqttClient.registerSimpleHandler(this);
// Subscribe to an initial topic, which is combination of client ID and device ID.
String initTopic = MQTT_CLIENT_ID + "/" + token;
subscribeToTopic(initTopic);
System.out.println("Connection established to " + MQTT_HOST + " on topic " + initTopic);
// Save start time
mStartTime = System.currentTimeMillis();
}
// Disconnect
public void disconnect() {
try {
mqttClient.disconnect();
} catch (MqttPersistenceException e) {
System.out.println("MqttException");
}
}
/*
* Send a request to the message broker to be sent messages published with
* the specified topic name. Wildcards are allowed.
*/
private void subscribeToTopic(String topicName) throws MqttException {
if ((mqttClient == null) || (mqttClient.isConnected() == false)) {
// quick sanity check - don't try and subscribe if we don't have
// a connection
System.out.println("Connection error" + "No connection");
} else {
String[] topics = {topicName};
mqttClient.subscribe(topics, MQTT_QUALITIES_OF_SERVICE);
}
}
/*
* Sends a message to the message broker, requesting that it be published
* to the specified topic.
*/
private void publishToTopic(String topicName, String message) throws MqttException {
if ((mqttClient == null) || (mqttClient.isConnected() == false)) {
// quick sanity check - don't try and publish if we don't have
// a connection
System.out.println("No connection to public to");
} else {
mqttClient.publish(topicName,
message.getBytes(),
MQTT_QUALITY_OF_SERVICE,
MQTT_RETAINED_PUBLISH);
}
}
/*
* Called if the application loses it's connection to the message broker.
*/
public void connectionLost() throws Exception {
System.out.println("Loss of connection" + "connection downed");
}
/*
* Called when we receive a message from the message broker.
*/
public void publishArrived(String topicName, byte[] payload, int qos, boolean retained) {
// Show a notification
String s = "";
try {
s = new String(payload, "UTF-8");
ByteArrayOutputStream boas = null;
BufferedOutputStream bos = null;
boas = new ByteArrayOutputStream();
bos = new BufferedOutputStream(boas);
bos.write(payload, 0, payload.length);
bos.flush();
} catch (Exception ex) {
ex.printStackTrace();
}
System.out.println("topicName:" + topicName);
System.out.println("payload:" + s);
System.out.println("qos:"+qos);
System.out.println("retained:"+retained);
}
public void sendKeepAlive() throws MqttException {
// publish to a keep-alive topic
publishToTopic(MQTT_CLIENT_ID + "/keepalive", token);
}
}
PushService.java
import com.ibm.mqtt.MqttException;
/*
* PushService that does all of the work.
* Most of the logic is borrowed from KeepAliveService.
* http://code.google.com/p/android-random/source/browse/trunk/TestKeepAlive/src/org/devtcg/demo/keepalive/KeepAliveService.java?r=219
*/
public class PushService
{
public PushService(String token)
{
device_id = token;
}
// the port at which the broker is running.
private static int MQTT_BROKER_PORT_NUM = 1883;
// the IP address, where your MQTT broker is running.
private static final String MQTT_HOST = "127.0.0.1";
// This is the instance of an MQTT connection.
private MQTTConnection mConnection;
private static String device_id = "";
public synchronized void start() {
System.out.println("Starting service...");
// Establish an MQTT connection
connect();
}
private synchronized void stop() {
// Destroy the MQTT connection if there is one
if (mConnection != null) {
mConnection.disconnect();
mConnection = null;
}
}
//
private synchronized void connect() {
System.out.println("Connecting...");
// fetch the device ID from the preferences.
String deviceID =this.device_id;
// Create a new connection only if the device id is not NULL
if (deviceID == null) {
System.out.println("Device ID not found.");
} else {
try {
mConnection = new MQTTConnection(MQTT_HOST,MQTT_BROKER_PORT_NUM,deviceID);
} catch (MqttException e) {
// Schedule a reconnect, if we failed to connect
System.out.println("MqttException: " + (e.getMessage() != null ? e.getMessage() : "NULL"));
}
}
}
}
MQTT Server則到
http://mqtt.org/software 下載你想要的
而管理介面則去抄他的