Sample Flink Code That Reads From Kafka and Writes to Cassandra

The context

Last summer, I started an open source project called boontadata-streams.
It is an environment where one can compare big data streaming engines like Apache Flink, Apache Storm or Apache Spark Streaming to name a few.

You can find the project at https://github.com/boontadata/boontadata-streams/.

As of this writing, the project contains an implementation of the scenario with Flink.

The principle is to have the following components:

  • Some Python code simulates IOT objects which
    • sends events to Kafka
    • sends its view of the truth to an Apache Cassandra database
  • Apache Flink consumes the events from Kafka. It calculates aggregates and writes them to the Cassandra database
  • Some other Python code reads the Cassandra database to compare the results between the IOT simulator and Flink

For instance, here is what we get when time windows are calculated based on processing time:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Comparing ingest device and flink for m1_sum
--------------------------------------------------
25 exceptions out of 25
Exceptions are:
window_time device_id category m1_sum_ingest_devicetime m1_sum_flink delta_m1_sum_ingestdevice_flink
0 2016-12-05 14:51:55 90fddddf-c42c-4418-8133-fb3866fe826c cat-1 459.0 NaN NaN
1 2016-12-05 14:52:00 90fddddf-c42c-4418-8133-fb3866fe826c cat-1 909.0 459.0 450.0
2 2016-12-05 14:52:05 90fddddf-c42c-4418-8133-fb3866fe826c cat-1 448.0 942.0 -494.0
3 2016-12-05 14:52:10 90fddddf-c42c-4418-8133-fb3866fe826c cat-1 1087.0 402.0 685.0
4 2016-12-05 14:52:15 90fddddf-c42c-4418-8133-fb3866fe826c cat-1 681.0 1203.0 -522.0
5 2016-12-05 14:52:20 90fddddf-c42c-4418-8133-fb3866fe826c cat-1 NaN 681.0 NaN
6 2016-12-05 14:51:55 90fddddf-c42c-4418-8133-fb3866fe826c cat-2 162.0 NaN NaN
7 2016-12-05 14:52:00 90fddddf-c42c-4418-8133-fb3866fe826c cat-2 901.0 162.0 739.0
8 2016-12-05 14:52:05 90fddddf-c42c-4418-8133-fb3866fe826c cat-2 998.0 920.0 78.0
9 2016-12-05 14:52:10 90fddddf-c42c-4418-8133-fb3866fe826c cat-2 928.0 1033.0 -105.0
10 2016-12-05 14:52:15 90fddddf-c42c-4418-8133-fb3866fe826c cat-2 720.0 928.0 -208.0
11 2016-12-05 14:52:20 90fddddf-c42c-4418-8133-fb3866fe826c cat-2 NaN 768.0 NaN
12 2016-12-05 14:51:55 90fddddf-c42c-4418-8133-fb3866fe826c cat-3 325.0 NaN NaN
13 2016-12-05 14:52:00 90fddddf-c42c-4418-8133-fb3866fe826c cat-3 857.0 227.0 630.0
14 2016-12-05 14:52:05 90fddddf-c42c-4418-8133-fb3866fe826c cat-3 767.0 939.0 -172.0
15 2016-12-05 14:52:10 90fddddf-c42c-4418-8133-fb3866fe826c cat-3 632.0 783.0 -151.0
16 2016-12-05 14:52:15 90fddddf-c42c-4418-8133-fb3866fe826c cat-3 415.0 632.0 -217.0
17 2016-12-05 14:52:20 90fddddf-c42c-4418-8133-fb3866fe826c cat-3 NaN 514.0 NaN
18 2016-12-05 14:49:50 90fddddf-c42c-4418-8133-fb3866fe826c cat-4 49.0 NaN NaN
19 2016-12-05 14:51:55 90fddddf-c42c-4418-8133-fb3866fe826c cat-4 589.0 NaN NaN
20 2016-12-05 14:52:00 90fddddf-c42c-4418-8133-fb3866fe826c cat-4 1078.0 589.0 489.0
21 2016-12-05 14:52:05 90fddddf-c42c-4418-8133-fb3866fe826c cat-4 1310.0 1156.0 154.0
22 2016-12-05 14:52:10 90fddddf-c42c-4418-8133-fb3866fe826c cat-4 905.0 1238.0 -333.0
23 2016-12-05 14:52:15 90fddddf-c42c-4418-8133-fb3866fe826c cat-4 693.0 1079.0 -386.0
24 2016-12-05 14:52:20 90fddddf-c42c-4418-8133-fb3866fe826c cat-4 NaN 820.0 NaN

and here is what we get when extracting time from the event data (event time):

1
2
3
4
5
6
7
8
9
10
11
Comparing ingest device and flink for m1_sum
--------------------------------------------------
5 exceptions out of 21
Exceptions are:
window_time device_id category m1_sum_ingest_devicetime m1_sum_flink delta_m1_sum_ingestdevice_flink
4 2016-12-05 14:53:35 b8c0c5c4-9189-4a0d-a96b-55ca4adafe14 cat-1 388 NaN NaN
9 2016-12-05 14:53:35 b8c0c5c4-9189-4a0d-a96b-55ca4adafe14 cat-2 496 NaN NaN
14 2016-12-05 14:53:35 b8c0c5c4-9189-4a0d-a96b-55ca4adafe14 cat-3 296 NaN NaN
15 2016-12-05 14:51:10 b8c0c5c4-9189-4a0d-a96b-55ca4adafe14 cat-4 49 NaN NaN
20 2016-12-05 14:53:35 b8c0c5c4-9189-4a0d-a96b-55ca4adafe14 cat-4 537 NaN NaN

this second result is better. All the differences can be explained:
those with the 2016-12-05 14:53:35 time window happen because no further event was sent that could trigger the calculation of this time Window;
the 2016-12-05 14:51:10 time window for cat4 corresponds to an event that was sent too late by the IOT simulator (on purpose) so that the streaming engine cannot take it into account.

You can see the full execution log in GitHub.

Here are the versions used in the current implementation:


Component version comments
Python Python 3.5.2 Anaconda 4.2.0 (64-bit)
Apache Kafka 0.10
Apache Flink 1.1.3
Apache Cassandra 3.9
Apache Zookeeper 3.4.9

The project leverages Docker containers, it can run on a single Ubuntu VM with ~14 GB of RAM.
Of course, you are welcome to contribute. We may even let you use one of our Azure VMs while you develop. You can contact me: contact à boontadata.io.

The pom and the code

In order to write that, I relied on sample code on the Internet about Flink consuming Kafka and Flink writing to Cassandra.
Still, it took me some time to put everything together.

What I consumed most of my time on was the Maven configuration file: pom.xml.

I’m not a Java specialist, so I gave up on the JAR size optimization and preferred to have a uber-jar.

I couldn’t make the Kafka 0.10 client working so I used Kafka 0.8.2. This version of the client needs to communicate to Zookeeper. Still, it can read from Kafa 0.10.

In terms of Flink code, I created a pipeline that does the following:

  • reads from Kafka
  • parses message
  • assigns timestamp and watermarks from event data
  • creates a time window in order to remove duplicates (the key is the message id)
  • creates a time window in order to aggregate (based on device id, message category)
  • sends some debugging information to Cassandra debug table about events and their time windows
  • aggredates data
  • writes results to Cassandra agg_events table

Feel free to visit the GitHub repo, or use the code as samples, or fix it and create a pull request when you find ways to improve it, or add your own implementation on other streaming engines like Storm, Spark streaming or others.

A copy of the most significant pieces of code

The rest of this post is a copy of the most significant pieces of code

pom.xml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.boontadata.flink1</groupId>
<artifactId>flink1</artifactId>
<version>0.1</version>
<packaging>jar</packaging>
<name>Flink flink1 Job</name>
<url>http://boontadata.io</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.1.3</flink.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.9</version>
<executions>
<execution>
<id>unpack</id>
<!-- executed just before the package phase -->
<phase>prepare-package</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/client/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/streaming/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base_2.11</artifactId>
<version>${flink.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/streaming/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>${flink.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/streaming/connectors/kafka/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.2</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>kafka/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.8</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/I0Itec/zkclient\/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_2.11</artifactId>
<version>${flink.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/**,com/datastax/driver/**</includes>
</artifactItem>
<artifactItem>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>com/yammer/metrics/**</includes>
</artifactItem>
<artifactItem>
<groupId>org.apache.servicemix.bundles</groupId>
<artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
<version>0.8.2.2_1</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/kafka/**</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source> <!-- If you want to use Java 8, change this to "1.8" -->
<target>1.8</target> <!-- If you want to use Java 8, change this to "1.8" -->
</configuration>
</plugin>
</plugins>
<!-- If you want to use Java 8 Lambda Expressions uncomment the following lines -->
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<compilerId>jdt</compilerId>
</configuration>
<dependencies>
<dependency>
<groupId>org.eclipse.tycho</groupId>
<artifactId>tycho-compiler-jdt</artifactId>
<version>0.21.0</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<versionRange>[2.4,)</versionRange>
<goals>
<goal>single</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

StreamingJob.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
package io.boontadata.flink1;
import com.datastax.driver.core.Cluster;
import java.lang.Double;
import java.lang.Long;
import java.text.Format;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.Date;
import java.util.Iterator;
import java.util.Properties;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.cassandra.CassandraTupleSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
import org.apache.flink.util.Collector;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
* Skeleton for a Flink Streaming Job.
*
* For a full example of a Flink Streaming Job, see the SocketTextStreamWordCount.java
* file in the same package/directory or have a look at the website.
*
* You can also generate a .jar file that you can submit on your Flink
* cluster.
* Just type
* mvn clean package
* in the projects root directory.
* You will find the jar in
* target/quickstart-0.1.jar
* From the CLI you can then run
* ./bin/flink run -c io.boontadata.flink1.StreamingJob target/quickstart-0.1.jar
*
* For more information on the CLI see:
*
* http://flink.apache.org/docs/latest/apis/cli.html
*/
public class StreamingJob {
private static final String VERSION = "161205a";
private static final Integer FIELD_MESSAGE_ID = 0;
private static final Integer FIELD_DEVICE_ID = 1;
private static final Integer FIELD_TIMESTAMP = 2;
private static final Integer FIELD_CATEGORY = 3;
private static final Integer FIELD_MEASURE1 = 4;
private static final Integer FIELD_MESAURE2 = 5;
public static void main(String[] args) throws Exception {
String timeCharacteristic = "EventTime";
if (args.length > 0) {
timeCharacteristic = args[0];
}
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
env.setParallelism(2);
if (timeCharacteristic.equals("EventTime")) {
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
} else if (timeCharacteristic.equals("ProcessingTime")) {
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
} else if (timeCharacteristic.equals("IngestionTime")) {
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
}
Format windowTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// properties about Kafka
Properties kProperties = new Properties();
kProperties.setProperty("bootstrap.servers", "ks1:9092,ks2:9092,ks3:9092");
kProperties.setProperty("zookeeper.connect", "zk1:2181");
kProperties.setProperty("group.id", "flinkGroup");
// get data from Kafka, parse, and assign time and watermarks
DataStream<Tuple6<String, String, Long, String, Long, Double>> stream_parsed_with_timestamps = env
.addSource(new FlinkKafkaConsumer082<>(
"sampletopic",
new SimpleStringSchema(),
kProperties))
.rebalance()
.map(
new MapFunction<String,
Tuple6<String, String, Long, String, Long, Double>>() {
private static final long serialVersionUID = 34_2016_10_19_001L;
@Override
public Tuple6<String, String, Long, String, Long, Double> map(String value) throws Exception {
String[] splits = value.split("\\|");
return new Tuple6<String, String, Long, String, Long, Double>(
splits[FIELD_MESSAGE_ID],
splits[FIELD_DEVICE_ID],
Long.parseLong(splits[FIELD_TIMESTAMP]),
splits[FIELD_CATEGORY],
Long.parseLong(splits[FIELD_MEASURE1]),
Double.parseDouble(splits[FIELD_MESAURE2])
);
}
}
)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());
// deduplicate on message ID
WindowedStream stream_windowed_for_deduplication = stream_parsed_with_timestamps
.keyBy(FIELD_MESSAGE_ID)
.timeWindow(Time.of(5000, MILLISECONDS), Time.of(5000, MILLISECONDS));
DataStream<Tuple6<String,String,Long,String,Long,Double>> stream_deduplicated = stream_windowed_for_deduplication
.apply(new WindowFunction<Tuple6<String, String, Long, String, Long, Double>,
Tuple6<String, String, Long, String, Long, Double>, Tuple, TimeWindow>() {
// remove duplicates. cf http://stackoverflow.com/questions/35599069/apache-flink-0-10-how-to-get-the-first-occurence-of-a-composite-key-from-an-unbo
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple6<String, String, Long, String, Long, Double>> input,
Collector<Tuple6<String, String, Long, String, Long, Double>> out) throws Exception {
out.collect(input.iterator().next());
}
});
// Group by device ID, Category
WindowedStream stream_windowed_for_groupby = stream_deduplicated
.keyBy(FIELD_DEVICE_ID, FIELD_CATEGORY)
.timeWindow(Time.of(5000, MILLISECONDS), Time.of(5000, MILLISECONDS));
// add debug information on stream_windowed_for_groupby
stream_windowed_for_groupby
.apply(new WindowFunction<Tuple6<String, String, Long, String, Long, Double>,
Tuple2<String, String>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple keyTuple, TimeWindow window, Iterable<Tuple6<String, String, Long, String, Long, Double>> input,
Collector<Tuple2<String, String>> out) throws Exception {
for(Iterator<Tuple6<String, String, Long, String, Long, Double>> i=input.iterator(); i.hasNext();) {
Tuple6<String, String, Long, String, Long, Double> value = i.next();
out.collect(new Tuple2<String, String>(
"v" + VERSION + "- stream_windowed_for_groupby - " + Instant.now().toString(),
"MESSAGE_ID=" + value.getField(0).toString() + ", "
+ "DEVICE_ID=" + value.getField(1).toString() + ", "
+ "TIMESTAMP=" + value.getField(2).toString() + ", "
+ "time window start=" + (new Long(window.getStart()).toString()) + ", "
+ "time window end=" + (new Long(window.getEnd()).toString()) + ", "
+ "CATEGORY=" + value.getField(3).toString() + ", "
+ "M1=" + value.getField(4).toString() + ", "
+ "M2=" + value.getField(5).toString()
));
}
}
})
.addSink(new CassandraTupleSink<Tuple2<String, String>>(
"INSERT INTO boontadata.debug"
+ " (id, message)"
+ " VALUES (?, ?);",
new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoint("cassandra1").withPort(9042)
.addContactPoint("cassandra2").withPort(9042)
.addContactPoint("cassandra3").withPort(9042)
.build();
}
}));
// calculate sums for M1 and M2
DataStream<Tuple5<String, String, String, Long, Double>> stream_with_aggregations = stream_windowed_for_groupby
.apply(new WindowFunction<Tuple6<String, String, Long, String, Long, Double>,
Tuple5<String, String, String, Long, Double>, Tuple, TimeWindow>() {
// sum measures 1 and 2
@Override
public void apply(Tuple keyTuple, TimeWindow window, Iterable<Tuple6<String, String, Long, String, Long, Double>> input,
Collector<Tuple5<String, String, String, Long, Double>> out) throws Exception {
long window_timestamp_milliseconds = window.getEnd();
String device_id=keyTuple.getField(0); // DEVICE_ID
String category=keyTuple.getField(1); // CATEGORY
long sum_of_m1=0L;
Double sum_of_m2=0.0d;
for(Iterator<Tuple6<String, String, Long, String, Long, Double>> i=input.iterator(); i.hasNext();) {
Tuple6<String, String, Long, String, Long, Double> item = i.next();
sum_of_m1 += item.f4; // FIELD_MEASURE1
sum_of_m2 += item.f5; // FIELD_MESAURE2
}
out.collect(new Tuple5<String, String, String, Long, Double>(
windowTimeFormat.format(new Date(window_timestamp_milliseconds)),
device_id,
category,
sum_of_m1,
sum_of_m2
));
}
});
// send aggregations to destination
stream_with_aggregations
.addSink(new CassandraTupleSink<Tuple5<String, String, String, Long, Double>>(
"INSERT INTO boontadata.agg_events"
+ " (window_time, device_id, category, m1_sum_flink, m2_sum_flink)"
+ " VALUES (?, ?, ?, ?, ?);",
new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoint("cassandra1").withPort(9042)
.addContactPoint("cassandra2").withPort(9042)
.addContactPoint("cassandra3").withPort(9042)
.build();
}
}));
// execute program
env.execute("io.boontadata.flink1.StreamingJob v" + VERSION + " (" + timeCharacteristic + ")");
}
}

BoundedOutOfOrdernessGenerator.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package io.boontadata.flink1;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
import org.apache.flink.streaming.api.watermark.Watermark;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
// cf https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_timestamps_watermarks.html
/**
* This generator generates watermarks assuming that elements come out of order to a certain degree only.
* The latest elements for a certain timestamp t will arrive at most n milliseconds after the earliest
* elements for timestamp t.
*/
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Tuple6<String, String, Long, String, Long, Double>> {
private final long maxOutOfOrderness = 1_000L; // 1 second
private long currentMaxTimestamp = 0;
@Override
public long extractTimestamp(Tuple6<String, String, Long, String, Long, Double> element, long previousElementTimestamp) {
long timestamp = element.f2; // get processing timestamp from current event
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current highest timestamp minus the out-of-orderness bound
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}

:-) b

Tutoriel - Synchronisation De Votre Application De Fonction Avec Le Concentrateur IoT Et Exemple De Connexion Avec Un Objet Sigfox

Ce tutoriel a été rédigé par Eugénie Vinet, stagiaire à la Direction Technique de Microsoft France de avril à août 2016 avec l’aide de son maître de stage Philippe Beraud, Chief Security Advisor à la Direction Technique.
Je remercie Eugénie et Philippe pour cette contribution.

Synchroniser une application de fonction avec un concentrateur IoT

Pour créer une application de fonction avec Azure Function App, procédez comme suit :

1/ Connectez-vous au portail d’Azure à l’adresse https://portal.azure.com

2/ Cliquez sur New en haut à gauche et cherchez Function App.

3/ Cliquez sur create. Entrez ensuite un nom, lié le à un de vos groupes de ressources existants (ou à défaut créez-en un nouveau). Si vous voulez que votre application de fonction fonctionne en continu, choisissez un plan de service B1 associé au minimum.
Le portail Function App s’affiche

4/ Cliquez sur Or create your own custom function en bas. Une liste des modèles disponibles s’affiche pour votre application de fonction.

5/ Selon votre choix de programmation (C# ou Node) cliquez sur EventHubTrigger – C# ou EventHubTrigger – Node.
Azure Function App propose de déclencher une fonction à partir d’un événement dans un concentrateur d’événements, mais pas dans un concentrateur IOT; cependant, un concentrateur IOT peut être vu comme un concentrateur d’événements. C’est ce que nous allons faire.

6/ Précisez le nom de votre application de fonction et configurez la connexion avec votre concentrateur IoT dans Event Hub connection.

Pour connaître le nom et la chaîne de connexion du concentrateur IoT :

a. Rendez-vous dans les paramètres de votre concentrateur IoT puis cliquez sur Messaging.

b. Notez la valeur de Event-Hub compatible name (par exemple DecodeSigfoxHub dans notre illustration) ainsi que celle de Compatible endpoint.

c. Toujours au niveau de votre concentrateur IoT, allez ensuite dans Shared Access Policies et copier la connection string – primary Key. Enlevez la partie HostName à la chaine de caractères de manière à ne garder que SharedAccessKeyName et SharedAccessKey. Vous obtenez par exemple :
SharedAccessKeyName=iothubowner;SharedAccessKey=MrsSPDzU5=8g2gZZPKXOJ9EJ5RDlo2PVzmPuGniLRwQ=

d. Retournez dans la fenêtre de votre application de fonction - ne fermez pas la fenêtre de paramètres de votre concentrateur IoT, vous en aurez besoin plus tard -. Nommez votre application de fonction comme bon vous semble :)

e. Dans Event Hub Name, entrez la valeur précédente de Event-Hub compatible name. Attention cependant, il ne supporte que les noms en minuscule. S’il y a des majuscules dans votre valeur de Event hub compatible name, passez tout en minuscule (par exemple decodesigfoxhub dans notre illustration)

f. Cliquez ensuite sur select à côté de Event Hub connection. Cliquez ensuite sur Add a connection string en haut à droite.

g. Dans Connection name, spécifiez le nom que vous souhaitez.

h. Dans Connection string, précisez :
Endpoint=<CompatibleEndpoint><SharedAccessKeyName><SharedAccessKey>
Vous obtenez une chaine de caractère semblable à :
Endpoint=sb://iothub-ns-decodesigf-499548662a53f6c.servicebus.windows.net/;SharedAccessKeyName=iothubowner;SharedAccessKey=MrsSPDzU5=8g2gZZPKXOJ9EJ5RDlo2PVzmPuGniLRwQ=

7/ Validez.
Votre application de fonction s’exécutera alors à chaque message reçu par l’utilisateur.
Vous pouvez alors modifier les informations que vous recevez de votre concentrateur IoT pour les renvoyer autre part ou les stocker par exemple.
Vous trouverez ci-après un exemple concret d’usage d’une application de fonction dans notre contexte, à savoir le décodage de trames Sigfox pour les renvoyer sur la solution préconfigurée de surveillance à distance (Cf. portail associé à l’adresse https://www.azureiotsuite.com/) .

Exemple avec IoT Suie - Objet Sigfox

Nous vous proposons dans cette section un exemple de projet utilisant Azure Function App avec un concentrateur IoT.
La solution préconfigurée de surveillance à distance s’attend recevoir des messages sur son concentrateur IoT de type :

1
2
3
4
5
6
{
DeviceId = XX,
Humidity = XX,
Temperature = XX,
ExternalTemperature = XX
}

Dans ce projet nous voulons utiliser des objets Sigfox (voir ici http://makers.sigfox.com/#about) qui envoient des trames de type :
« 422D17 » avec 42 l’identifiant du type d’objet, 2D l’humidité (45%) et 17 la température (en décimal = 23°C). »
On va supposer que l’humidité et la température sont deux entiers qui vont de 0 à 100.

Prérequis

Les prérequis sont les suivants :
• Avoir lié un device Sigfox qui envoie des données de température et d’humidité, Cf. tutoriel PUSH YOUR SIGFOX DEVICES DATA TO AZURE IOT HUB .

• Avoir créé et provisionné une solution préconfigurée de type Solution de surveillance à distance sur le site dédié à l’adresse http://www.azureiotsuite.com/.

• Avoir suivi l’étape précédente de synchronisation d’une Function App avec un concentrateur IoT en utilisant le concentrateur IoT Hub lié à l’objet Sigfox.

Si ces prérequis sont satisfaits, les étapes de mise en œuvre sont les suivantes :

• Etape 1 – Ajout de l’objet préconfiguré.

• Etape 2 – Remplissage du fichier JSON.

• Etape 3 – Ajout des paquets.

• Etape 4 – Contenu de votre application de fonction.

Nous les développons dans l’ordre au travers d’une section dédiée propre.

Etape 1 – Ajout de l’objet préconfiguré

Pour ajouter un objet, procédez comme suit :

  1. Rendez-vous à présent sur l’environnement de démonstration iotmsfrance à l’adresse https://www.azureiotsuite.com/.
  2. Connectez-vous avec votre compte. Comme cela a été indiqué précédemment, vous devez disposer de privilèges d’administration pour pouvoir enregistrer un objet.
    De plus, réalisez cette étape UNIQUEMENT si l’objet n’apparait pas dans la liste des appareils du concentrateur IoT de l’environnement de démonstration (vous pouvez vérifié via le portail Azure ou via l’application Device Explorer). Si votre objet est déjà présent dans la liste, il faudra le supprimer en utilisant le Device Explorer.
    L’application Device Explorer est téléchargeable à l’adresse https://github.com/Azure/azure-iot-sdks/releases. Le répertoire d’installation par défaut de cette application est :
    C:\Program Files (x86)\Microsoft\DeviceExplorer
  3. Une fois sur le site, dans la liste des solutions approvisionnées, sélectionnez la Solution de surveillance à distance, c.à.d. la solution .azurewebsite.net.
  4. Cliquez en bas à gauche sur Ajouter un appareil (ou Add a device en anglais).

    L’étape 1 d’ajout d’un appareil s’affiche.
  5. Dans le cadre Appareil personnalisé, sélectionnez Ajouter un nouveau. L’étape 2 s’affiche.
  6. Sélectionnez l’option Me laisser définir mon propre ID d’appareil et donnez lui le même ID que dans Sigfox /!\ ça ne marchera pas si vous choisissez un id différent.
  7. Cliquez sur Vérifier l’ID. L’ID doit être disponible.
  8. Cliquez enfin sur Créer.
    A ce stade, l’objet n’est pas activé dans la plateforme. Il va falloir modifier les informations relatives à l’objet dans la base NoSQL DocumentDB. Pour cela, connectez-vous au portail d’Azure à l’adresse https://portal.azure.com et réalisez l’étape suivante.

Etape 2 – Remplissage du fichier JSON

Pour remplir le fichier JSON d’informations lié à l’objet, procédez comme suit :

  1. Rendez-vous dans la rubrique DocumentDB Accounts sur le portail Azure à l’adresse https://portal.azure.com.
  2. Sélectionner le compte DocumentDB du projet :
  3. Cliquez ensuite sur votre base de données DevMgmtDb (1). Dans DevMgmtDB, cliquez sur Document Explorer (2).
    Vous devriez voir apparaitre une liste comme celle-ci :
{
    "DeviceProperties": {
    "DeviceID": "{NomDeVotreDevice}",
    "HubEnabledState": true,
    "CreatedTime": "0001-01-01T00:00:00",
    "DeviceState": "normal",
    "Manufacturer": "{Manufacturer}",
    "ModelNumber": "{NomDuModelDuDevice}",
    "SerialNumber": "{NumeroDeSérie}",
    "FirmwareVersion": "{FirmwareVersion}",
    "Platform": "{Plateform}",
    "Latitude": {Longitude} ,
    "Longitude": {Latitude},
    "Processor": "{processor}",
    "InstalledRAM": "{installedRam}",
},
"SystemProperties": {
    "ICCID": null
},
 "Commands": [
    {
      "Name": "PingDevice",
      "Parameters": null
    },
    {
      "Name": "StartTelemetry",
      "Parameters": null
    },
{
  "Name": "StopTelemetry",
  "Parameters": null
},
{
  "Name": "ChangeSetPointTemp",
  "Parameters": [
    {
      "Name": "SetPointTemp",
      "Type": "double"
    }
  ]
},
{
  "Name": "DiagnosticTelemetry",
  "Parameters": [
    {
      "Name": "Active",
      "Type": "boolean"
    }
  ]
},
{
  "Name": "ChangeDeviceState",
  "Parameters": [
    {
      "Name": "DeviceState",
      "Type": "string"
    }
  ]
}
],
"CommandHistory": [],
"IsSimulatedDevice": 0,
 "id": "",
 "Telemetry": [
    {
  "Name": "Temperature",
  "DisplayName": "Temperature",
  "Type": "double"
},
{
  "Name": "Humidity",
  "DisplayName": "Humidity",
  "Type": "double"
}
],
 "Version": "1.0",
 "ObjectType": "DeviceInfo"
 }
}

Si vous n’avez pas réalisé l’étape 3 si l’objet existait déjà dans le concentrateur IoT, il est possible qu’il n’y ait aucun fichier relatif à l’objet dans la base de données. Il va alors falloir en créer un en cliquant en haut à gauche.
Par contre si vous aviez réalisé l’étape 3, le fichier relatif au device devrait se trouver dans la liste. Il devrait avoir un nom de type « 2f23278e-0e82-4850-86f5-b5c3f83a8850 » ou bien le nom du device comme « BFFBA ». Pour vérifier si un fichier est lié à votre objet, il suffit de l’ouvrir et de lire le DeviceID. Il doit être identique à l’ID que vous avez enregistré sur la page web de la solution de surveillance à distance et identique à l’ID de l’objet enregistré sur le portail Sigfox. (1)
Supprimez ou modifiez le contenu du fichier dont le DeviceID est le nom de votre objet et recopier le texte à droite puis complétez/modifiez les zones en jaune comme suit :

    HubEnabledState : doit être passé à true.
    DeviceState : doit être normal. 
    CreatedTime : doit être de la forme : AAAA-MM-JJThh:mm:ss 
Avec AAAA l’année, MM le mois, JJ le jour, hh l’heure, mm les minutes et ss les secondes. 
Il est recommandé de mettre la date et l’heure qu’il est au moment où vous remplissez le fichier mais cela ne va pas avoir d’impact sur le fonctionnement général de la surveillance à distance. 
    SerialNumber : doit préciser le numéro de série de l’objet si celui en est « équipé ». A défaut, écrivez le nom de l’objet entre guillemets.
    Manufacturer : doit préciser le nom du fabricant de l’objet entre guillemets.
    ModelNumber : doit préciser le numéro de modèle de l’objet si celui en est « équipé ». A défaut, écrivez le nom de l’objet entre guillemets
    FirmwareVersion, Platform, Processor et InstalledRam : si vous ne les connaissez pas pour le device utilisé vous pouvez écrire « None ». 
    Latitude et Longitude : vous avez le choix de son emplacement. L’objet s’affichera à l’endroit indiqué sur la carte. Vous pouvez vous aider de ce site : http://www.latlong.net/ pour trouver la longitude et la latitude d’un endroit précis. Attention : la longitude et la latitude ne doivent pas être écrites entre guillemets sur le document. 
    Id : sera l’ID du document que vous avez créé/modifié. Il est recommandé d’utiliser le nom de l’objet afin de retrouver plus facilement le document. 

Etape 3 - Ajout des paquets

Tout d’abord nous allons ajouter les paquets nécessaires à notre application de fonction. Pour ceci, nous allons suivre les étapes pour charger un fichier project.json contenant le nom de tous nos paquets (rubrique Package Management sur le site https://azure.microsoft.com/en-us/documentation/articles/functions-reference-csharp/ et File Update ici https://azure.microsoft.com/en-us/documentation/articles/functions-reference/#fileupdate ).
Procédez comme suit :

  1. Rendez-vous dans les outils de votre application de fonction.
  2. Cliquez sur Kudu puis sur accédez. Vous vous retrouvez donc sur le système de gestion des fichiers de votre application de fonction.
  3. Allez dans Debug Console puis PowerShell. En utilisant la commande ou bien en utilisant l’explorateur de fichier au-dessus rendez-vous sur D:\home\site\wwwroot\>
  4. Ajoutez alors un fichier project.json qui contient les lignes suivantes - un simple glissé-déposé suffit - :

    {
    “frameworks”: {
    “net46”:{

    "dependencies": {                                                
            "Microsoft.Azure.Devices.Client":"1.0.6",
            "Newtonsoft.Json":"8.0.3",                
            "Microsoft.Azure.Devices":"1.0.6"            
            }
        }      
    }
    

    }

Etape 4 – Définition du contenu de votre application de fonction

Pour définir le contenu de votre application de fonction, procédez comme suit :

1/ Retournez sur votre application de fonction et ajoutez la classe DataToReceive. Le contenu dépendra du contenu du JSON entré pour le callback de votre Device Type dans le portail Sigfox.
Si vous avez par exemple ce JSON :

{
   "device" : "{device}",
   "data" : "{data}",
   "time" : {time},
   "snr" : {snr},
   "station" : "{station}",
   "rssi" : {rssi},
   "seqNumber" : {seqNumber}
}

Votre classe DataToReceive doit ressembler à cela :

public class DataToReceive
{
    public string device { get; set; }
    public string data { get; set; }
    public double time { get; set; }
    public string seqNumber { get; set; }
    public string rssi { get; set; }
    public string snr { get; set; }
    public string station { get; set; }
}

Il faut qu’il y ait au minimum les variables « device » et « data ».

2/ Créer ensuite une classe DataToSend qui réunira les informations que vous voulez envoyer dans votre solution préconfigurée de surveillance à distance. Nous envoyons ici que l’Id de l’objet, la température et l’humidité. Vous pouvez ajouter le paramètre External Température.

1
2
3
4
5
6
public class DataToSend
{
public string DeviceId { get; set; }
public double Humidity { get; set; }
public double Temperature { get; set; }
}

3/ Créez la classe SendToIotHub comme suit. Remplacez bien les valeurs de connectionString et d’iotHubUri par les éléments relatifs au concentrateur IoT de la solution préconfigurée.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class SendToIotHub
{
DeviceClient deviceClient;
Device device;
string jsonMessageToSend;
string connectionString = "{IotHubConnexionString}";
string iotHubUri = "{IotHubName}.azure-devices.net";
public async void sendData(DataToSend data, TraceWriter log)
{
RegistryManager registryManager = RegistryManager.CreateFromConnectionString(connectionString);
device = new Device(data.DeviceId);
device = await AddDeviceAsync1(data.DeviceId, registryManager);
deviceClient = DeviceClient.Create(iotHubUri, new DeviceAuthenticationWithRegistrySymmetricKey(data.DeviceId,device.Authentication.SymmetricKey.PrimaryKey.ToString()));
jsonMessageToSend = JsonConvert.SerializeObject(data);
var messageToSend = new Microsoft.Azure.Devices.Client.Message(System.Text.Encoding.ASCII.GetBytes(jsonMessageToSend));
deviceClient.SendEventAsync(messageToSend);
log.Info($"C# Event Hub trigger function processed a message: {jsonMessageToSend}");
}
private async Task<Device> AddDeviceAsync1(string deviceId, RegistryManager registryManager)
{
Device device;
try
{
device = await registryManager.AddDeviceAsync(new Device(deviceId));
}
catch (DeviceAlreadyExistsException)
{
device = await registryManager.GetDeviceAsync(deviceId);
}
return device;
}
}

4/ Ajoutez les directives using « de circonstance » au début de votre application de fonction :

1
2
3
4
5
using Microsoft.Azure.Devices;
using Microsoft.Azure.Devices.Client;
using System;
using Microsoft.Azure.Devices.Common.Exceptions;
using Newtonsoft.Json;

5/ Complétez maintenant la méthode Run :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void Run(string myEventHubMessage, TraceWriter log)
{
log.Info($"C# Event Hub trigger function processed a message: {myEventHubMessage}");
DataToReceive receivedData = new DataToReceive();
DataToSend sentData = new DataToSend();
SendToIotHub sendDataToIotHub = new SendToIotHub();
receivedData = JsonConvert.DeserializeObject<DataToReceive>(myEventHubMessage);
if (receivedData.data != null && receivedData.data.Substring(0, 2) == "42")
{
string data = receivedData.data;
sentData.Humidity = Convert.ToInt32(data.Substring(2, 2), 16);
sentData.Temperature = Convert.ToInt32(data.Substring(4, 2), 16);
sentData.DeviceId = receivedData.device;
sendDataToIotHub.sendData(sentData, log);
}
}

Vous obtiendrez quelque chose de similaire à la capture qui suit si tout va bien :

Vous devriez voir dans les logs les messages reçus et envoyés si vous gardez les lignes de code log.info (…..).

Bien sûr, nous partons du principe que les trames à utiliser sont de la forme « 422D17 » (comme développé précédemment) mais il est possible que vous ayez à faire des calculs sur chaque élément de votre trame pour obtenir les valeurs décimales renvoyées par vos capteurs. Pour ceci, renseignez-vous auprès du constructeur de l’objet.

Et voilà !
Ceci conclut ce tutoriel.

Deploying Containers to One or Several VMs

Here is a sample test to show how Docker Swarm and Docker Compose simplify network management.

We’ll deploy a topology of 3 containers and see how they can reach each other. In the first case, we deploy it on a single node. In the second case, we deploy it on a 2 host swarm cluster.

The Swarm cluster is created as a Swarm Azure Container Service instance with the main following parameters:

  • Orchestrator configuration: Swarm (btw, other option for ACS is DC/OS)
  • Agent count: 2
  • Agent virtual machine size: (leave default)
  • Master count: 1
  • DNS prefix for container service: myacs

Two files are created

The Dockerfile contains:

1
2
3
FROM busybox
ENTRYPOINT ["init"]

The docker-compose.yml file has the following content:

1
2
3
4
5
6
7
8
9
10
11
version: '2'
services:
node1:
build: .
container_name: n1
node2:
build: .
container_name: n2
node3:
build: .
container_name: n3

on a single box

Here is what I get on the single box:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
benjguin@benjguinu1605a:~/simpletest$ docker-compose up -d
Creating network "simpletest_default" with the default driver
Creating n1
Creating n3
Creating n2
benjguin@benjguinu1605a:~/simpletest$ docker-compose ps
Name Command State Ports
------------------------------
n1 init Up
n2 init Up
n3 init Up
benjguin@benjguinu1605a:~/simpletest$ docker exec n1 ping -c 2 n2
PING n2 (172.19.0.4): 56 data bytes
64 bytes from 172.19.0.4: seq=0 ttl=64 time=0.086 ms
64 bytes from 172.19.0.4: seq=1 ttl=64 time=0.067 ms
--- n2 ping statistics ---
2 packets transmitted, 2 packets received, 0% packet loss
round-trip min/avg/max = 0.067/0.076/0.086 ms
benjguin@benjguinu1605a:~/simpletest$ docker exec n1 ping -c 2 n3
PING n3 (172.19.0.3): 56 data bytes
64 bytes from 172.19.0.3: seq=0 ttl=64 time=0.072 ms
64 bytes from 172.19.0.3: seq=1 ttl=64 time=0.072 ms
--- n3 ping statistics ---
2 packets transmitted, 2 packets received, 0% packet loss
round-trip min/avg/max = 0.072/0.072/0.072 ms
benjguin@benjguinu1605a:~/simpletest$ docker exec n3 ping -c 2 n1
PING n1 (172.19.0.2): 56 data bytes
64 bytes from 172.19.0.2: seq=0 ttl=64 time=0.051 ms
64 bytes from 172.19.0.2: seq=1 ttl=64 time=0.060 ms
--- n1 ping statistics ---
2 packets transmitted, 2 packets received, 0% packet loss
round-trip min/avg/max = 0.051/0.055/0.060 ms

on a Swarm cluster

Let’s now do the same from the master node of my ACS cluster.

network information was added to the docker-compose.yml file:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
version: '2'
services:
node1:
build: .
container_name: n1
networks:
- net34
node2:
build: .
container_name: n2
networks:
- net34
node3:
build: .
container_name: n3
networks:
- net34
networks:
net34:
driver: overlay

and here is the result of the test:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
export DOCKER_HOST=172.16.0.5:2375
benjguin@swarm-master-B295EC2C-0:~$ docker-compose up -d
Creating network "benjguin_net34" with driver "overlay"
Creating n1
Creating n3
Creating n2
benjguin@swarm-master-B295EC2C-0:~$ docker inspect n1 | grep swarm-agent
"Name": "swarm-agent-B295EC2C000001",
benjguin@swarm-master-B295EC2C-0:~$ docker inspect n2 | grep swarm-agent
"Name": "swarm-agent-B295EC2C000000",
benjguin@swarm-master-B295EC2C-0:~$ docker inspect n3 | grep swarm-agent
"Name": "swarm-agent-B295EC2C000000",
benjguin@swarm-master-B295EC2C-0:~$ docker exec n2 ping -c 2 n1
PING n1 (10.0.0.2): 56 data bytes
64 bytes from 10.0.0.2: seq=0 ttl=64 time=1.088 ms
64 bytes from 10.0.0.2: seq=1 ttl=64 time=0.830 ms
--- n1 ping statistics ---
2 packets transmitted, 2 packets received, 0% packet loss
round-trip min/avg/max = 0.830/0.959/1.088 ms
benjguin@swarm-master-B295EC2C-0:~$ docker exec n1 ping -c 2 n2
PING n2 (10.0.0.4): 56 data bytes
64 bytes from 10.0.0.4: seq=0 ttl=64 time=0.671 ms
64 bytes from 10.0.0.4: seq=1 ttl=64 time=0.695 ms
--- n2 ping statistics ---
2 packets transmitted, 2 packets received, 0% packet loss
round-trip min/avg/max = 0.671/0.683/0.695 ms
benjguin@swarm-master-B295EC2C-0:~$ docker exec n1 ping -c 2 n3
PING n3 (10.0.0.3): 56 data bytes
64 bytes from 10.0.0.3: seq=0 ttl=64 time=1.050 ms
64 bytes from 10.0.0.3: seq=1 ttl=64 time=0.783 ms
--- n3 ping statistics ---
2 packets transmitted, 2 packets received, 0% packet loss
round-trip min/avg/max = 0.783/0.916/1.050 ms
benjguin@swarm-master-B295EC2C-0:~$ docker exec n2 ping -c 2 n3
PING n3 (10.0.0.3): 56 data bytes
64 bytes from 10.0.0.3: seq=0 ttl=64 time=0.068 ms
64 bytes from 10.0.0.3: seq=1 ttl=64 time=0.076 ms
--- n3 ping statistics ---
2 packets transmitted, 2 packets received, 0% packet loss
round-trip min/avg/max = 0.068/0.072/0.076 ms

Conclusion

We have containers in a common network which is described only thru docker means.
This works on a single host, and also on multiple hosts (in the example, a host had 1 container, another host had 2 of the 3 containers).

MapR on Azure

Introduction

There are different ways to install Hadoop on Azure. The blog post about the different flavors of Hadoop will provide more context.

This blog post shows the main steps to start with MapR on Azure.

How to

In order to install the cluster, follow the wizzard that you’ll find in Azure portal.

Here is a quick view of this wizzard:

Step 3 gives you a chance to download the generated Azure resource manager wizzard that you can modify and deploy as described in the following article: Deploy an application with Azure Resource Manager template.

Once you’ve created the cluster, go to https://{yourclustername}-node0.{install-location}.cloudapp.azure.com:9443 and connect.

In my case, I named the cluster mapr34 and installed it in North Europe region, so it is https://mapr34-node0.northeurope.cloudapp.azure.com:9443.

NB: this mapr34-node0.northeurope.cloudapp.azure.comhost name can be found in the portal when you browse the resource group where the cluster is. It’s attached to the public IP of the node.

Use mapr as the username and the password you provided in step 2 of the wizzard as the password.

Select each node and check the disks where you want to install the distributed file system.
/dev/sdb1 is the cache disk. The 1023 GB disks are VHDs.

Use the Next button to move on

Click Install -> to start the installation process.

After a number of minutes, the installation completes.

On the final step, you can find a link to a short name. Unless you’ve created an SSH tunnel to your cluster, you may need to use the long name instead. In this example where my cluster is called mapr34 and is installed in North Europe, the URL is https://mapr34node1:8443/. I replace it by https://mapr34-node1.northeurope.cloudapp.azure.com:8443.

NB: this mapr34-node1.northeurope.cloudapp.azure.comhost name can be found in the portal when you browse the resource group where the cluster is. It’s attached to the public IP of the node.

ypou connect with the same credentials as before: mapr/{the password you provided in step 2 of the creation wizzard}.

Now that the MapR file system is installed. Let’s see it as HDFS. Let’s also check if we can access Azure blob storage.

The wasb driver (wasb stands for Windows Azure Storage Blob) is not installed by default :-( .

If you go back to the installation page you’ll have the option to install additional services:

When you’re done, you can stop the services, before shutting down the Azure virtual machines. If there are many nodes, you may want to use Azure PowerShell module or Azure Command Line Interface (Azure CLI). You can find them in the resources section of azure.com.

You may also prefer to remove all the resources that consitute the cluster: VMs, storage, vNet and so on. Of course, all the data will be removed as well, so you are asked to type the resource group before deleting it.

Conclusion

We saw how to create a MapR cluster in Azure. You just have to enter a few parameters in friendly Web interfaces and wait for the cloud and MapR to create everything for you!

:-)
Benjamin (@benjguin)