-
Notifications
You must be signed in to change notification settings - Fork 25
How stuff work
Webmachine is a Restful API toolkit written in erlang, that is built on top of the excellent bit-pushing and HTTP syntax-management provided by mochiweb. Webmachine makes it easier for us to integrate a RESTful API to sensor-cloud.
- Webmachine GitHub Repository can be found here.
An application in webmachine is called a resource. The resource contains the source code of your application and together with webmachines API, you can modify the behavior of your application based on HTTP method requests but also on other HTTP options. Webmachine provides a toolkit of functions that could be used in the resource, more specified documentation about the functions can be found here.
A webmachine application uses URI dispatching to resources. This makes it possible to distribute multiple URI paths to different resources in webmachine. The dispatch configuration is done on priv/dispatch.conf
.
The dispatch map data structure is a list of tuples.
The tuple is a 3-tuples of the form
{pathspec, resource, args}
.
-
pathspec
is a list of strings that specifies the uri path that matches the URI for a request. -
resource
is the name of the webmachine resource that is supposed to handle HTTP requests to the correspondingpathspec
. -
args
is a list of arguments that could be passed to the resource upon execution, in most of the cases it is not needed.
Regarding the pathspec, wildcards are allowed to be used together with tokens that are encapsulated in ''
.
Tokens could be used to assign values that could be used by the resource. The result would be a list of tupled key-value pairs where the key is an atom and the value is a string.
{["streams", 'id'], streams, []}.
{["streams"], streams, []}.
{["resources"], resource, []}.
In this example we can see how different path specifications use different/same resources. HTTP requests made to srv:8000/streams
will be handled by the streams.erl
resource while requests made to srv:8000/resources
will be handled by resource.erl
. The first row shows how we can use uri information in our resources. A request could be made to srv:8000/streams/1
and the value of id
can be extracted by the resource streams
using the webmachine API resource function wrq:path_info
which would give us a list of tuples that are key-value paired, in this case we would get [{id::atom, "1"::string}]
.
A table with different examples can be found here.
Configurations to a webmachine application can be done by defining a supervisor for the resource. If the name of our resource is resource.erl
then the supervisor would be named resource_sup.erl
. Configuration consist of a list of tuples. Once the desired configuration options have been defined, the next step is to create a child spec to give to an erlang supervisor process.
init([]) ->
WebConfig = [
{ip, "192.168.1.1"},
{port, 8000},
{dispatch, Dispatch}],
Web = {webmachine_mochiweb,
{webmachine_mochiweb, start, [WebConfig]},
permanent, 5000, worker, dynamic},
Processes = [Web],
{ok, { {one_for_one, 10, 10}, Processes} }.
This is an example of a resource supervisor, we begin by defining the desired configurations such as ip-adress and port number as a list of tuples inside WebConfig
, after that we create a child spec to give to an erlang supervisor process.
A list of different configuration options can be found here.
We are using elasticsearch for our datastore. Elasticsearch works like a document datastore that you can connect to with a RESTful API. To create/index new documents in elasticsearch, you do a POST request with a json object. This also means that when GETting a document you will get a json object. All entities in our system are stored as separate documents. The relations are not handled by elasticsearch in any way. The entity simply has the id of the "owner", for example a stream has the id of the resource that it belongs to.
A "flexible and powerful open source, distributed real-time search and analytics engine for the cloud" - http://www.elasticsearch.org/
Elasticsearch is running with the default options. This means you can do http requests to it at port 9200. See Setup for installation instructions.
Configuration of our elasticsearch server can be found here: https://github.com/projectcs13/elasticsearch/blob/configured/config/elasticsearch.yml
Basically, the only thing we have changed is the cluster name.
The entities that exist within our system. Each of these entities is a special type of Elasticsearch document.
Fields currently in a resource:
- username
Fields currently in a resource:
- user_id
- uuid
- name
- tags
- description
- type
- manufacturer
- streams
- uri
- polling_freq
- creation_date
A stream is a representation of a stream of data. A stream has meta-data associated with it. The actual data is saved as individual datapoints.
Fields currently in a stream:
- resource_id
- name
- tags
- description
- private
- type
- accuracy
- min_val
- max_val
- quality
- active
- user_ranking
- subscribers
- last_updated
- creation_date
- history_size
- location
Fields currently in a virtual stream:
- name
- description
- tags
- group
- private
- history_size
- last_updated
- creation_date
- function
Fields currently in a datapoint:
- timestamp
- value
- stream_id
Fields currently in a virtual stream datapoint:
- timestamp
- value
- stream_id
- Concepts
- Overview
- Clients
- Program Structure
- How to publish
- How to subscribe
- How to use a web-socket
- How to subscribe using JAVA
- RabbitMQ
##Concepts Top
- Streams. A stream is a flow of data. A user can subscribe to a stream.
- Virtual Streams. A virtual stream subscribes to one or several streams/virtual streams and process incoming data according to a defined function, which could be a prediction or some other aggregation of data. A user can subscribe to a virtual stream.
- Prediction. Currently, the predictions are calculated using R.
- Data points. A data point is a data value for a stream/virtual stream. When the system receives a data point from a external resources in Json format, the system parse it and transform it to a data point.
Top
When a new data point arrives for a stream, it is published in the pub/sub system and distributed to all clients who have subscribed to the stream. The system also supports dynamic visualization and prediction. RabbitMQ has been utilized to implement the back-end messaging. Node.js and Socket.IO are used to interact with the web pages via web-sockets.
##Clients Top
- Web pages Pub/Sub system could push data to webpages which enable dynamic visualization.
- Sessions Inform a logged on user about their subscriptions.
- Users Inform logged of users about their subscriptions.
##Program Structure Top
- virtual_stream_process_supervisor.erl A supervisor that dynamically can create and terminate virtual stream processes, which are used to handle incoming and outgoing data into and from the pub/sub system.
- gen_virtual_stream_process.erl The implementation of the virtual stream process. Subscribes to the streams in question and is able to apply the function: min, max, avg or total on them as new data points arrive. It also have the function diff which it can apply only on one stream to work correctly.
Top
When a data point is published it is published to an exchange and sent to the queues in a specified manner, this is an example of doing a fanout in an Erlang module:
%% Connect
{ok, Connection} =
amqp_connection:start(#amqp_params_network{host = ?IP_ADDRESS}),
%% Open channel
{ok, ChannelOut} = amqp_connection:open_channel(Connection),
%% Declare exchange
amqp_channel:call(ChannelOut, #'exchange.declare'{exchange = ?EXCHANGE, type = <<"fanout">>}),
%% Publish data point
amqp_channel:cast(Channel, #'basic.publish'{exchange = ?EXCHANGE}, #amqp_msg{payload = ?DATAPOINT}).
For further details check the documentation here
Check the documentation here for how to publish via Node.js using Rabbit.js.
Top
In the system each stream/virtual stream has it own exchange in RabbitMQ and clients subscribe to these exchanges by hooking up queues to them. To subscribe you need to connect to the exchange, this is an example how to do it in an Erlang module:
%% Connect
{ok, Connection} =
amqp_connection:start(#amqp_params_network{host = ?IP_ADDRESS}),
%% Open channel
{ok, ChannelIn} = amqp_connection:open_channel(Connection),
%% Declare queue
amqp_channel:call(ChannelIn, #'exchange.declare'{exchange = ?EXCHANGE, type = ?TYPE}),
#'queue.declare_ok'{queue = QueueIn} = amqp_channel:call(ChannelIn, #'queue.declare'{exclusive = true}),
amqp_channel:call(ChannelIn, #'queue.bind'{exchange = ?EXCHANGE, queue = QueueIn}),
%% Subscribe to INPUT queue
amqp_channel:subscribe(ChannelIn, #'basic.consume'{queue = QueueIn, no_ack = true}, self()),
receive
{#'basic.deliver'{}, #amqp_msg{payload = Body}} ->
// Do something with the data
end.
For further details check the documentation here
How to subscribe via Node.js using Rabbit.js:
var sub = context.socket('SUB');
sub.connect(?EXCHANGE);
sub.on('data', function(data) {
// Do something with the data
});
For further details check the documentation here
Top
The system allows subscriptions via web-sockets, which is used to update web pages in real time.
To let a webpage connect via a web-socket Socket.IO.js is used.
Here is an example of how to connect to a web-socket:
var socket = io.connect(?IP_ADDRESS);
socket.on(?MSG_TYPE, function(data) {
// Do something with the data
});
For further details look in the documentation here
Tip: If you want to connect several clients on different things through one web-socket, consider using namespaces, a functionality provided in Socket.IO.
Top
Subscribing in JAVA is rather straightforward using the rabbitmq client. First include the following maven artifact:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.2.1</version>
</dependency>
You can use the following snippet to subscribe asynchronously on stream updates. The EXCHANGE_NAME is "streams.streamid" where streamid should be replaced by the id of the stream.
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); // listening on port 5672
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
boolean autoAck = true;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
//long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
//channel.basicAck(deliveryTag, false);
// {"value":"10.9","timestamp":"2013-11-25T15:29:34.000","stream_id":"_9e-AzbuRPy_8eRD17IQPg"}
String json = new String(body);
}
});
##RabbitMQ
Top
RabbitMQ is a message broker and provides robust messaging for many successful commercial websites, including Twitter. It runs on a majority of operating systems and is easy to use. RabbitMQ offers client`s libraries for many mainstream programming languages, including Erlang which we are using in this project. RabbitMQ is AMQP based protocol and the following are essential concepts:
- Queues. A queue is used to cache incoming messages and a client could fetch messages from the queue.
- Exchange. A exchange is where data points arrive and distribute to the connected queues according to some rule.
- Publisher/Subscriber. A publisher is something that push data into the pub/sub system and a subscriber, commonly known as a consumer, is something that listens to specific data that comes in to the pub/sub system.
If you would like to know more about RabbitMQ, please visit its official website.
#Statistical Analysis We have included functionality to do some statistical analysis on stream data. As we have connected the language R for statistical analysis, the possibilities for later expansions is very large.
##R R is a programming language mainly for statistical analysis and is an open source dialect of S. It is a fairly well established language, so we can assume a good level of stability and reliability.
It is implemented in C/C++, and has a lot of external libraries that can easily be installed from within R itself.
For more information, look at the R-Project homepage.
##rErlang Since the engine of SensorCloud is written in Erlang, we wanted to hook up R with Erlang somehow. While Erlang has good support for connecting in to C applications, it would have meant quit a deal of extra work for us if we would have done it ourselves. Luckily, there was already a project for this on R's homepage, called rErlang. It has an open SVN repository where we could get the necessary source. The project has not been updated since 2009, and it turned out to have several bugs in it. We uploaded the code to our own repository, both to be able to hook it up to our automatic download system for our dependencies, and to be able to make changes as we wanted to.
The library consists of two parts; one small Erlang part that uses Erlangs built in functionality to connect it with the C side, and one fairly large C part. The library is meant to be able to both call R from Erlang and Erlang from R (not just responses), though we are only interested in calling R from Erlang. The C side communicates with Erlang (at least in part) by writing to stderr, while stdout is never emptied (one of the bugs we found was an overflow in the stdout buffer, since the had prints to it).
rErlang starts an R process which it uses message passing to communicate with. While it is written to only have one rErlang process running at any given time, that restriction can fairly easily be fixed.
We have written a wrapper for the API in Erlang. Expanding the functionality for new functionality with R does not require a lot of work.
##Forecasting/Predictions Predictions is the first (and probably most important) statistical analysis we implemented. It uses an implementation of the ARIMA method (AutoRegressive Integrated Moving Average) in R to make a prediction. The method as such seems to be very well established for doing time series forecasting. While it could be done for an arbitrary history size and an arbitrary prediction size, we have limited it for computational reasons (at the time of writing it is set to history size of 50 and prediction size of 25 number of datapoints).
While the method itself has a few input variables, we use a package (forecast) in R that can calculate these for us, automating the entire process so we only have to give it the list of values from the stream.