Java MQTT Client

摘要:Java MQTT Client

這是我是從

http://tokudu.com/2010/how-to-implement-push-notifications-for-android/  

這篇文章學來,並從他提供的Source,另外抽出來使用,原本是for Android,我從出來變成獨立的console程式,

 

https://github.com/tokudu/AndroidPushNotificationsDemo

從這裡抄出來之後,並取得他使用的wmqtt.jar

將Android的部分去掉,與獨立化之後。

變成以下內容

MQTTConnection.java


import com.ibm.mqtt.*;

import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;

/**
 * Created by IntelliJ IDEA.
 * User: TomLai
 * Date: 2012/9/18
 * Time: 上午 10:23
 * To change this template use File | Settings | File Templates.
 */
public class MQTTConnection implements MqttSimpleCallback {
    IMqttClient mqttClient = null;


    // Let's not use the MQTT persistence.
    private static MqttPersistence MQTT_PERSISTENCE = null;
    // We don't need to remember any state between the connections, so we use a clean start.
    private static boolean MQTT_CLEAN_START = true;
    // Let's set the internal keep alive for MQTT to 15 mins. I haven't tested this value much. It could probably be increased.
    private static short MQTT_KEEP_ALIVE = 60 * 15;
    // MQTT client ID, which is given the broker. In this example, I also use this for the topic header.
    // You can use this to run push notifications for multiple apps with one MQTT broker.
    public static String MQTT_CLIENT_ID = "tokudu";
    // Set quality of services to 0 (at most once delivery), since we don't want push notifications
    // arrive more than once. However, this means that some messages might get lost (delivery is not guaranteed)
    private static int[] MQTT_QUALITIES_OF_SERVICE = {0};
    private static int MQTT_QUALITY_OF_SERVICE = 0;
    // The broker should not retain any messages.
    private static boolean MQTT_RETAINED_PUBLISH = false;

    private long mStartTime;
    private String token = "";

    public MQTTConnection(String MQTT_HOST,int MQTT_BROKER_PORT_NUM,String token) throws MqttException {
        // Create connection spec
        String mqttConnSpec = "tcp://" + MQTT_HOST + "@" + MQTT_BROKER_PORT_NUM;
        // Create the client and connect
        mqttClient = MqttClient.createMqttClient(mqttConnSpec, MQTT_PERSISTENCE);
        String clientID = MQTT_CLIENT_ID + "/" + token;
        mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE);

        // register this client app has being able to receive messages
        mqttClient.registerSimpleHandler(this);

        // Subscribe to an initial topic, which is combination of client ID and device ID.
        String initTopic = MQTT_CLIENT_ID + "/" + token;
        subscribeToTopic(initTopic);

        System.out.println("Connection established to " + MQTT_HOST + " on topic " + initTopic);

        // Save start time
        mStartTime = System.currentTimeMillis();
    }

    // Disconnect
    public void disconnect() {
        try {
            mqttClient.disconnect();
        } catch (MqttPersistenceException e) {
            System.out.println("MqttException");
        }
    }

    /*
          * Send a request to the message broker to be sent messages published with
          *  the specified topic name. Wildcards are allowed.
          */
    private void subscribeToTopic(String topicName) throws MqttException {

        if ((mqttClient == null) || (mqttClient.isConnected() == false)) {
            // quick sanity check - don't try and subscribe if we don't have
            //  a connection
            System.out.println("Connection error" + "No connection");
        } else {
            String[] topics = {topicName};
            mqttClient.subscribe(topics, MQTT_QUALITIES_OF_SERVICE);
        }
    }

    /*
          * Sends a message to the message broker, requesting that it be published
          *  to the specified topic.
          */
    private void publishToTopic(String topicName, String message) throws MqttException {
        if ((mqttClient == null) || (mqttClient.isConnected() == false)) {
            // quick sanity check - don't try and publish if we don't have
            //  a connection
            System.out.println("No connection to public to");
        } else {
            mqttClient.publish(topicName,
                    message.getBytes(),
                    MQTT_QUALITY_OF_SERVICE,
                    MQTT_RETAINED_PUBLISH);
        }
    }

    /*
          * Called if the application loses it's connection to the message broker.
          */
    public void connectionLost() throws Exception {
        System.out.println("Loss of connection" + "connection downed");
    }

    /*
          * Called when we receive a message from the message broker.
          */
    public void publishArrived(String topicName, byte[] payload, int qos, boolean retained) {
        // Show a notification
        String s = "";
        try {
            s = new String(payload, "UTF-8");
            ByteArrayOutputStream boas = null;
            BufferedOutputStream bos = null;
            boas = new ByteArrayOutputStream();
            bos = new BufferedOutputStream(boas);
            bos.write(payload, 0, payload.length);
            bos.flush();
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        System.out.println("topicName:" + topicName);
        System.out.println("payload:" + s);
        System.out.println("qos:"+qos);
        System.out.println("retained:"+retained);
    }

    public void sendKeepAlive() throws MqttException {
        // publish to a keep-alive topic
        publishToTopic(MQTT_CLIENT_ID + "/keepalive", token);
    }
}

 

PushService.java


import com.ibm.mqtt.MqttException;

/* 
 * PushService that does all of the work.
 * Most of the logic is borrowed from KeepAliveService.
 * http://code.google.com/p/android-random/source/browse/trunk/TestKeepAlive/src/org/devtcg/demo/keepalive/KeepAliveService.java?r=219
 */
public class PushService
{
    public PushService(String token)
    {
         device_id = token;
    }

    // the port at which the broker is running.
    private static int MQTT_BROKER_PORT_NUM = 1883;
	// the IP address, where your MQTT broker is running.
	private static final String		MQTT_HOST = "127.0.0.1";
		
	// This is the instance of an MQTT connection.
	private MQTTConnection			mConnection;

    private static String device_id = "";

	public synchronized void start() {
		System.out.println("Starting service...");
		
		// Establish an MQTT connection
		connect();
	}

	private synchronized void stop() {
		// Destroy the MQTT connection if there is one
		if (mConnection != null) {
			mConnection.disconnect();
			mConnection = null;
		}
	}
	
	// 
	private synchronized void connect() {		
		System.out.println("Connecting...");
		// fetch the device ID from the preferences.
		String deviceID =this.device_id;
		// Create a new connection only if the device id is not NULL
		if (deviceID == null) {
			System.out.println("Device ID not found.");
		} else {
			try {
				mConnection = new MQTTConnection(MQTT_HOST,MQTT_BROKER_PORT_NUM,deviceID);
			} catch (MqttException e) {
				// Schedule a reconnect, if we failed to connect
				System.out.println("MqttException: " + (e.getMessage() != null ? e.getMessage() : "NULL"));
			}

		}
	}
}

 

MQTT Server則到

http://mqtt.org/software 下載你想要的

而管理介面則去抄他的

http://github.com/tokudu/PhpMQTTClient 即可