# System Environment and System Abstraction

Get Complete Project Material File(s) Now! »

## System Environment and System Abstraction

In this chapter, we introduce the streaming system environment and abstraction to the reader. Then, we state the formal description of our modelling problem.

System Environment

In our modeling work we use Spark Streaming over a cluster of 6 nodes and we collect traces from the di erent workloads consisting of:
Application level measures (Spark related metrics) collected through Spark listener System measures (CPU, memory, IO, network metrics) collected using Nmon

System Abstraction

A complex analytical task i can be modeled as a logical data ow program denoted as P i and shown in gure 3. When we apply a particular value of the con guration vector Cji = (’; ; ; ) to that data ow program, then we can get a closer look on a MR pair in the physical execution plan in gure 4. This con guration parameters determine the number of tasks to be launched in a map/reduce phase, and thus determines the number of mapper and reducer nodes.

Formal Description of the problem

We assume, for each data ow program (also called job, or workload, or analytical task), there exists a workload characteristics vector which is a invariant to time and con gurations. The formal de nition is:
De nition 1 Workload Characteristics Ei is a real valued vector satisfying three con-ditions:
1. Independence. Ei should be independent from the other causation factors (Con gu-ration).
2. Invariance. For the same logical data ow program, Ei should be (approximately) an invariant.
3. Reconstruction. Ei should carry all the information to reconstruct Oji, given Cji.

Formal description of the modelling system

Under our assumptions, the modelling system can be abstracted as a deterministic function
F s.t.
F (Cji; Ei) = lji
Interpreted as an optimization problem, our goal is to nd function F such that:
F = arg min avg(distance(F (Cji; Ei); lji)
where average is taken among all the training points such that F will be a reasonably good regressor which is able to generalize well over unseen points.

### Optimization target for workload characteristics

We still need to de ne the optimization target for calculating the workload characteristics.
Mathematically, the independence and invariance conditions can be captured by:
Ei = arg min var(Eji) (2.3.3.1)
Where Eji is the encoding obtained with a particular con guration Cji of a data ow program i, by using the vector of traces Oji. The variance is taken for the same logical data ow program i, among di erent observations Oji (obtained from di erent con gurations Cji) with j varying.
The third condition (reconstruction) can be captured by:
Ei = arg min avg(distance(F 0(Cji; Ei); Oji) (2.3.3.2)
where F 0 is a similar function to F , and is de ned by: F 0(Cji; Ei) = Oji
In practice, we need an algorithm to construct Ei from existing data, such that it satis es the formulas 2.3.3.1 and 2.3.3.2 at the same time. Sometimes the two conditions contradict with each other, so how to construct Ei satisfying both requirements is the key to our work.

Data generation, preprocessing and data splits

This chapter describes the di erent workloads (queries) P i used in our benchmark, and details their generation process. It then goes through the preprocessing operations done on data before tting any regressor, and nally it shows how data is split inside each training setting.

In our work, we used a dataset that consists of all the requests made to the 1998 World Cup Web site between April 30, 1998 and July 26, 1998. During this period of time the site received 1,352,804,107 requests. From this dataset, we created an arti cial data stream source that pumps data to the streaming system receivers. The analytical tasks that we used in our modeling can be expressed in CQL (Continous Query Language).
Workload 1 This analytical task is a windowed aggregate query with selectivity control that computes the number of clicks made by each user every ’ seconds.
SELECT userId, COUNT(*) as counts
FROM UserClicks [range ’ slide ’]
GROUP BY userId
Workload 2 This analytical task is a global aggregate query with selectivity control that computes the number of times a URL has been visited and keeps only URLs with more than 1000 visits. The computation is done on the batches as they are received, but the output of this query is obtained when the stream ends.
SELECT URL, COUNT(*) as counts
FROM UserClicks
GROUP BY URL
HAVING counts > 1000
Workload 3 This analytical task is a windowed aggregate query with selecti ty and win-dow control that computes the number of clicks made by each user over the last 30 seconds, and updates the count every 20 seconds.
SELECT URL, COUNT(*) as counts
FROM UserClicks [Range 30s slide 20s]
GROUP BY URL
HAVING counts > 1000
Workload 4 This analytical task is a windowed operation with control over the window size that keeps up reporting every ’ second(s) the ids of the users who’ve been visiting the website during the last 30 seconds. Note that here not all resulting tuples are ushed to HDFS.
SELECT userId
FROM UserClicks [range 30s slide ’]
GROUP BY userId
Workload 5 This analytical task is a windowed join with selectivity control. The join operation is done between the data stream (corresponding to clicks) and a dataset stored in HDFS that contains the page ranks. It computes for every user, the sum of the ranks of the pages he has visited over the last 30 seconds, and updates the results every ’ second(s). Note that here not all resulting tuples are ushed to HDFS.
SELECT userId, SUM(pageRank)
FROM Rankings and UserClicks [Range 30s slide ’]
WHERE Rankings.pageURL = Userclicks.URL
GROUP BY userId
Workload 6 This analytical task is the same as the previous one with the di erence that all resulted tuples are ushed to HDFS in here. The CQL equivalent of this workload is the same as the previous one.
SELECT userId, SUM(pageRank)
FROM Rankings and UserClicks [Range 30s slide ’]
WHERE Rankings.pageURL = Userclicks.URL
GROUP BY userId

READ  Electromagnetic scattering with sign-changing coefficients

Data generation

After writing the Scala equivalent of the previous CQL queries, we ran the analytical tasks using the Spark Streaming environment earlier introduced in section 2.1 for di erent system con gurations Cj = (’; ; ; )j. At each time we choose a value of , we make sure to set this value in both the arti cial data pump and the Spark streaming system. For each particular con guration Cji corresponding to P i, we obtain a vector of traces Oji(t) as well as lji(t) corresponding to latencies obtained after processing each batch.
Data Generation process is time consuming, and changing the con guration is only possible every 40 minutes approximately. There are 10 minutes to warm up and to reach steady state when a Spark streaming job is being run. The next 30 minutes correspond to trace collection, and during which we construct lji(t) and Oji(t). So it’s possible to run a workload (query) on at most 36 con gurations per day. This is why our multi-objective optimizer needs a strong regressor capable of making good predictions in order to avoid wasting computation time. Note that both lji(t) and Oji(t) are available only when we want to train the regressor, but when it comes to reality, it’s not possible to obtain Oji(t) before running the job on con guration Cji. This is what makes our problem not a classical machine learning one.

Preprocessing

The rst step we did in our preprocessing is to take the average of the latency and observation vectors for each particular con guration Cji since we’re interested in predicting the average latency obtained for Cji and not predicting the time series. This is because the time series doesn’t make any sense in our context, and the notion of time comes from the fact that our system is processing one batch after another.

#### Data Description, Evaluation metric and some statistics

In our experiments, we consider having two datasets D1 consisting of the full data that has been collected and D2 with D2 D1 corresponding to traces from common con gurations tested across all workloads. The number of samples in D1 is detailed in Table 1.
In dataset D2 the con gurations were obtained from all the possible combinations from:
Batch Interval (s): ’ 2 f5; 10g
Block Interval(ms): 2 f200; 400; 600; 800; 1000g Parallelism 2 f18; 36; 54; 72; 96g
Input Rate (Million of samples/second) 2 f0:48; 0:66g
Which makes in total 100 con gurations. However, for some workloads, some of these con gurations failed, which left us at the end with 92 common con gurations across all workloads.

1 Problem Statement
2 System Environment and System Abstraction
2.1 System Environment
2.2 System Abstraction
2.3 Formal Description of the problem
2.3.2 Formal description of the modelling system
2.3.3 Optimization target for workload characteristics
3 Data generation, preprocessing and data splits
3.2 Data generation
3.3 Preprocessing
3.4 Data Description, Evaluation metric and some statistics
3.5 Training settings and Data splits
4 The baseline approaches for regression
4.1 Hyper-parameter selection
4.2 Experiment 1: Standalone regressors
4.2.1 Design
4.2.2 Experimental setup
4.2.3 Results and Analysis using dataset D1 (all congurations)
4.2.4 Results and Analysis using dataset D2 (common congurations)
4.3 Experiment 2: Onehot encoding
4.3.1 Design
4.3.2 Experimental setup
4.3.3 Results and Analysis using dataset D1 (all congurations)
4.3.4 Results and Analysis using dataset D2 (common congurations)
4.3.5 Pros and Cons
5 Deep Learning models
5.1 Embedding
5.1.1 Design
5.1.2 Experimental setup
5.1.3 Results and Analysis (using dataset D1, no pre-training)
5.1.4 Results and Analysis (with and without pre-training)
5.1.5 Pros and Cons
5.2 AutoEncoder
5.2.1 Design
5.2.2 Experimental setup
5.2.3 Results and Analysis using dataset D1 (all congurations)
5.2.4 Results and Analysis using dataset D2 (common congurations)
5.2.5 Pros and Cons
6 Qualitative and Quantitative Comparision
6.1 Qualitative comparision
6.2 Quantitative comparision
7 Related Work
7.1 Contractive Auto-Encoder