Giter Site home page Giter Site logo

dat110-project2-startcode's Introduction

DAT110 - Project 2: Publish-subscribe messaging middleware

Organisation

Weeks 9 and 10 are devoted to project work which is to be undertaken in groups of 2-4 students. Discussions among the groups are allowed, but the code handed in by the group should be the work of the group members - and not members of other groups.

There will be no lectures on Wednesday 26/2, but there will be labs at the normal time-slots in week 9. You are also encouraged to use the discussion forum in Canvas throughout the project weeks.

Overview

The aim of the project is to implement a publish-subscribe messaging-oriented middleware (PB-MOM) on top of the TCP-based message transport layer from project 1. You are not required to implement the messaging transport layer, but you are given an implementation if it as part of the start code. There should not be any need to directly use TCP/UDP transport services and socket programming, only indirectly via the provided message transport service implementation.

You are assumed to have read Chapter 4 (Communication) in the distributed systems book and be familiar with the concepts of publisher clients, subscriber clients, topics, and brokers. You are also assumed to be familiar with the service provided by the message transport layer that we implemented as part of project 1.

The client-side of the PB-MOM consists of publishers and subscribers that can create/delete topics, subscribe/unsubscribe to topics, and publish messages to topics. When a publisher publishes a message on a given topic, then all currently connected clients subscribing to the topic is to receive the message.

The figure below gives an overview of the PB-MOM that is to be implemented:

The server-side is comprised of a broker that manages the connected clients, topics and subscriptions, and which acts as an intermediate responsible for publishing messages to the subscribers of a given topic.

The project is comprised of the following main tasks:

Task A. Implement classes for the messages to be used in the publish-subscribe protocol between clients and the broker.

Task B. Implement the storage of topics and subscriptions in the broker, and the processing of publish-subscribe messages received from connected clients.

Task C. Application of the PB-MOM for implementing a small IoT system in which a sensor publishes the current temperature on a temperature topic to which a display is subscribing (see also lab-exercises from earlier weeks and project 1).

Task D. Experiment with PB-MOM for implementing the ChApp (Chat Social Network Application) where users can send short messages to each other via topics similar to what is found in may social network applications.

Task E. Extend the broker such that if a subscribing client is currently disconnected and later reconnects, then the client will be provided with the messages that may have been published on the topic while the client was disconnected.

Task F. Extend the broker from being single-threaded to being multi-threaded having a thread for handling each connected client.

It is only required to do one of the tasks E or F - and not both.

Getting Started

You should start by cloning the Java code which can be found in the github repository

https://github.com/selabhvl/dat110-project2-startcode.git

which contains an Eclipse-project with start-code.

NOTE When opening the project in Eclipse, there will be some compile-errors. These will go away as you complete the implementation of the tasks below.

n order for the group to use their own git-repository for the further work on the codebase, one member of the group must create an empty repository on github/bitbucket without a README file and without a .gitignore file, and then perform the following operations

git remote remove origin

git remote add origin <url-to-new-empty-repository>

git push -u origin master

The other group members can now clone this new repository and work with a shared repository as usual.

In addition, you should also clone the following project:

https://github.com/selabhvl/dat110-project2-testing

which contains a number of unit tests that can be used for some basic testing of the implemented functionality. These tests are by no means complete, and when running the test you should also check in the Eclipse console that no exceptions are raised when running the tests.

Task A: Publish-subscribe Protocol Messages

The messages to be exchanged between the clients and the broker is to be defined as classes in the no.hvl.dat110.messages package. The base message class is Message and all message classes must be subclasses of this class. All messages will contain information about a user and have a type as defined in MessageType.java. The user is assumed to uniquely identify a connected client.

The communication between the client and the broker is to be based on the message transport layer/service implemented as part of project 1. An implementation of this layer is provided a part of the start-code in the no.hvl.dat110.messagetransport package

The no.hvl.dat110.messages already contains classes implementing the following messages for the publish-subscribe protocol:

  • ConnectMsg.java - sent by the client as the first message after having established the underlying message transport connection to the broker.

  • DisconnectMsg.java - sent from the client in order to disconnect from the broker.

You are required to complete the implementation of the remaining message-classes.

  • CreateTopicMsg.java - sent by the client in order to have the broker create a topic. A topic is to be identified by means of a String

  • DeleteTopicMsg.java - sent by the client in order to have a topic deleted.

  • SubscribeMsg.java - sent by the client in order to subscribe to a topic.

  • UnsubscribeMsg.java - sent by the client in order to unsubscribe from a topic.

  • PublishMsg.java - sent by the client in order to publish a message (String) on a topic and sent by the broker in order to deliver the message to subscribed clients.

You must determine what object variables are needed in the classes. The message-classes must have a constructor that can give a value to all object-variables, getter/setter methods for all object-variables, and they must implement a toString-method to be used for logging purposes.

There are no tests available for testing your implementation of the message-classes, but the classes will be tested as part of the tests in Task B below.

Task B: Broker Implementation

The implementation of the broker can be found in the no.hvl.dat110.broker package.

You will have to study the code of the broker which is comprised of the following subclasses

  • ClientSesssion.java used to represent a session with a currently connected client on the broker side. Whenever a client (user) connects, a corresponding ClientSession-object will be created on the broker-side encapsulating the underlying message transport connection.

  • Storage.java which is to implement the storage of currently connected clients and manage the subscription of clients (users) to topics. You will complete the implementation of this class in Task B.1 below.

  • Broker.java implementing a Stopable-thread abstraction. The doProcess-methods of the broker runs in a loop accepting incoming message transport connections (sessions) from clients.

  • Dispatcher.java implementing a Stopable-thread that is responsible for processing the messages received from clients. The doProcess()-methods of the dispatcher checks (polls) the client sessions for incoming messages and then invokes the dispatcher-method which, depending on the type of received message, will invoke the corresponding handler method. You will complete the implementation of the dispatcher in Task B.2 below.

  • BrokerServer.java which contains the main-method of the broker. It is responsible for starting up the server and creating the storage and dispatcher of the broker.

The figure below gives an overview of the implementation of the BrokerServer. The Broker uses an underlying MessagingServer (from the messaging layer) to receive new message connections from clients. It then hands off these connections to the Dispatcher which is responsible for processing incoming messages on the connections using the information stored in the Storage.

Both the Broker and the Dispatcher runs as Stopable-threads as implemented by the Stopable-class in Stopable.java:

public abstract class Stopable extends Thread {

	private boolean stop = false;
	protected String name;

	public Stopable(String name) {
		this.name = name;
	}

	public synchronized void doStop() {
		stop = true;
	}

	private synchronized boolean doCont() {
		return !stop;
	}

	public abstract void doProcess();

	public void run() {

		Logger.log(name + " running");

		while (doCont()) {
			doProcess();		
		}

		Logger.log(name + " stopping");

	}
}

Task B.1 Broker Storage

The Storage-class of the broker implements an in-memory storage where the broker can store information about connected clients and the subscription of user (clients) to topics. The start of the class is already provided:

public class Storage {

	private ConcurrentHashMap<String, Set<String>> subscriptions;
	private ConcurrentHashMap<String, ClientSession> clients;

	public Storage() {
		subscriptions = new ConcurrentHashMap<String, Set<String>>();
		clients = new ConcurrentHashMap<String, ClientSession>();
	}
 [...]

The basic idea is to use a hash-map mapping from topics (String) to a set of users (String) for managing which users are subscribed to which topics. Similarly, the currently connected clients are stored in a hash-map mapping from a user (String) to a ClientSession-object representing the connection/session with the client.

The broker data model for the storage is illustrated below

You are required to complete the implementation of the following methods in Storage.java

  • public void addClientSession(String user, Connection connection)

  • public void removeClientSession(String user)

  • public void createTopic(String topic)

  • public void deleteTopic(String topic)

  • public void addSubscriber(String user, String topic)

  • public void removeSubscriber(String user, String topic)

  • public Set<String> getSubscribers(String topic)

The TODO-comments in Storage.java class provides more detailed information about what the individual methods are supposed to do.

The package no.hvl.dat110.broker.storage.tests contains some basic unit tests that can be used to test the implementation of the storage methods.

Task B.2 Broker Dispatcher for Message Processing

All communication between the broker and the connected clients will be done via the send, receive, and hasData-methods of the corresponding ClientSession-object. The encapsulation of the underlying message transport connection has been already implemented in the ClienSession.java class.

The messages exchanged between the broker and the client will be a JSON-representation of the objects of the message-classes implemented in Task A. As an example, a ConnectMsg-object will be represented as follows:

{"type":"CONNECT","user":"testuser"}

The conversion to/from the JSON format has already been implemented using the gson-library library in the MessageUtils.java class.

The aim of this task it to implement the broker-side processing of the messages received from clients in the Dispatcher.java class. The doProcess-method of the dispatcher runs in a loop where it in turn checks the current client sessions for an incoming message using the hasData-method. If the client has sent a message, then it will invoke the dispatch-method which in turn will invoke a method named on the form onX for a processing a message of type X.

The dispatcher contains an implementation of the onConnect and on onDisconnect-methods. Your task is to complete the implementation of the remaining methods in Dispatcher.java

  • public void onCreateTopic(CreateTopicMsg msg)

  • public void onDeleteTopic(DeleteTopicMsg msg)

  • public void onSubscribe(SubscribeMsg msg)

  • public void onUnsubscribe(UnsubscribeMsg msg)

  • public void onPublish(PublishMsg msg)

in order to be able to also process the remaining types of messages.

The tests found in the no.hvl.dat110.broker.processing.tests package can be used to test the implemented methods.Please Note that the tests in the package will have to be run one at a time as they are using the same TCP/IP port for the broker.

Task C: IoT sensor-display application

In this task you will use the PB-MOM middleware to implement a small IoT system comprised of a (temperature) sensor, and a display.

The start of the implementation of the IoT-system can be found in the no.hvl.dat110.iotsystem package.

The class Client.java contains an implementation of the methods needed for implementing a client that can connect to the broker. The class Common.java contains the port number that can be used for connecting clients to the broker server.

The principle of the sensor-display application is shown below

Sensor device implementation

The skeleton of the sensor device implementation can be found in the SensorDevice.java class. You are required to complete the implementation such that the sensor device connects to a broker, runs in a loop COUNT-times where it publishes to a temperature topic. After that the sensor device should disconnect from the broker.

Display device implementation

The skeleton of the display device implementation can be found in the DisplayDevice.java class. You are required to complete the implementation of the display device such that it connects to the same broker as the sensor device, creates a temperature topic, subscribes to this topic and then receives the same number of messages as the sensor device is sending on the topic. Upon completion, the display device should disconnect from the broker.

Testing the IoT sensor-testing system

Try to start a broker and have the display device and then the sensor device connects. Check that the display device is correctly receiving the temperature-messages published by the sensor device.

The test in the package no.hvl.dat110.iotsystem.tests can be used to run the IoT system. When running the test you should see output similar to:

IoT system starting ...
Starting broker ...
Broker server : 8080
Dispatcher running
.Broker running
Broker accept [0]
....Starting display ...
Display starting ...
.!0
?
Message [type=CONNECT, user=display]
onConnect:Message [type=CONNECT, user=display]
Client sessions:1
Broker accept [0]
.?
CreateTopicMsg [topic=temperature]Message [type=CREATETOPIC, user=display]
onCreateTopic:CreateTopicMsg [topic=temperature]Message [type=CREATETOPIC, user=display]
Topic : 1
.?
SubscribeMsg [topic=temperature]Message [type=SUBSCRIBE, user=display]
onSubscribe:SubscribeMsg [topic=temperature]Message [type=SUBSCRIBE, user=display]
Subscribers : temperature : 1
..Starting sensor ...
temperature device started
!0
?
READING: 2
Message [type=CONNECT, user=temperaturesensor]
onConnect:Message [type=CONNECT, user=temperaturesensor]
Client sessions:2
Broker accept [0]
.?
PublishMsg [topic=temperature, message=2]Message [type=PUBLISH, user=temperaturesensor]
onPublish:PublishMsg [topic=temperature, message=2]Message [type=PUBLISH, user=temperaturesensor]
DISPLAY: 2
....READING: 20
.?

[ ... ]

.Display stopping ...
.?
UnsubscribeMsg [topic=temperature]Message [type=UNSUBSCRIBE, user=display]
onUnsubscribe:UnsubscribeMsg [topic=temperature]Message [type=UNSUBSCRIBE, user=display]
Subscribers : temperature : 0
.?
Message [type=DISCONNECT, user=display]
onDisconnect:Message [type=DISCONNECT, user=display]
Client sessions:1
.Temperature device stopping ...
IoT system stopping ...

Task D: ChApp - Chat social network application

The purpose of this task is to connect multiple JavaFX-based GUI clients to a broker, and in this way implement a short messaging system. The figure below show a screenshot of the client. The application client makes it possible to connect to a broker, create/delete topics, subscribe/unsubscribe to topics, and to publish messages on topics.

The architecture of the chat application system is shown below

and the GUI of the chat application client is the following

A demonstration of the application can be found here: https://www.youtube.com/watch?v=qGibmzlm0x0&feature=youtu.be

Task D.1 Setup JavaFX and PB-MOM

Clone the implementation of the ChApp-client which is available as an Eclipse-project from here:

https://github.com/selabhvl/dat110-project2-chapp.git

If using Java 11 SDK (or later), then you will have to download JavaFX for your platform and then configure the project. For Java 8/9/10 JavaFX is included as part of JDK.

  1. Download the 11.0.2 distribution from https://gluonhq.com/products/javafx/ (remember to download for the correct platform - Mac/Linux/Windows)

  2. Follow the instructions for JavaFX and Eclipse for non-modular projects: https://openjfx.io/openjfx-docs/#install-javafx (except that you do not need to create a new project as you already have the dat110-project2-chapp project). The main class for the launch configuration is no.hvl.dat110.chapp.Chapp

  3. In order to compile the chatapp client you will in addition have to add the Eclipse project containing your implementation of the PB-MOM middleware to the Build Path of the project for the chat application GUI client.

Task D.2 Running the chat application

Start by testing the system by running the broker and two clients on the same machine. The broker will run on the TCP/IP port specified in the class BrokerServer.java and you start the broker by running the main-method in this class. Try creating topics and then publish some messages.

Next, start a broker on one machine and let each group member run the ChApp-client on their machine. If you are not able to connect to the broker it may be due to firewall issues on the host running the broker or the client. Make sure that the port on which the broker is running is not blocked by the firewall.

Task E: Message Buffering

Please not that you only need to do either task E or Task F - both both.

When a client disconnects from the broker, the corresponding ClientSession-object is removed from the storage. This means that if the client is subscribing to a topic and messages are published on that topic while the client is disconnected, then the client will not receive the published messages. If the client later reconnects, it will only receive those message that were published after the reconnect.

The aim of this task is to extend the implementation of the broker such that the broker will buffer any messages for a subscribed client until the point where the client connects again. At that point, the broker should then publish the buffered message to the client. Implementing this extension will involve

  • changing the implementation of how a disconnect-message from the client is processed by the dispatcher.
  • augmenting the broker storage such that buffering of messages for the clients becomes possible.
  • changing the implementation of how a connect from a client is handled by the dispatcher.

You may use the ChApp-application to test the buffering implementation or alternatively write a unit test similar to the ones found in the no.hvl.dat110.broker.processing.tests package to create a scenario where a client (subscriber) disconnects for a while and then reconnects.

Task F: Multi-threaded Broker

Please not that you only need to do either task E or Task F - both both.

The implementation of the dispatcher in the Dispatcher.java class runs as a single Stopable-thread which in turn checks the current client sessions for incoming messages using the hasData-method. This means that it is not possible to exploit multiple-cores when running the broker, and this may degrade the performance of the broker as perceived by the clients.

The aim of this task is to change the implementation of the dispatcher such that each client session has an associated thread which processes the incoming message from the corresponding client.

Solving this task means that a new thread has to be spawned whenever a client connects. This thread will then wait for incoming messages from the client and handles these accordingly. When the client disconnect the corresponding thread should be terminated. It should also be possible to stop/terminate the execution of all current threads. In the current implementation, the single threaded dispatcher can be stopped by invoking the doStop-method on the dispatcher.

Handing in the project

Each group must hand in a link on Canvas to a git-repository containing their implementation.

Please remember to hand-in as a member of a group in Canvas: https://hvl365-my.sharepoint.com/:w:/g/personal/akv_hvl_no/EdkQXNKVjmhPrHNtD3n5r74B6KSb7DwmVYf9MA3SIUA4Sw?e=hC5Q9i

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.