Performance database: capturing data for optimizing distributed streaming workflows
Abstract
The performance database (PDB) stores performance-related data gathered during workflow enactment. We argue that, by carefully understanding and manipulating these data, we can improve efficiency when enacting workflows. This paper describes the rationale behind the PDB, and proposes a systematic way to implement it. The prototype is built as part of the Advanced Data Mining and Integration Research for Europe project. We use workflows from real-world experiments to demonstrate the usage of PDB.
1. Introduction
It is evident that data-intensive research is transforming the research landscape, as recognized in The Fourth Paradigm [1]. We are facing the challenge of handling the deluge of data generated by sensors and modern instruments that are widely used in all domains. Owing to the scale, complexity and heterogeneity of data gathered in scientific experiments, it is difficult to extract useful information and knowledge through exhaustive and unstructured computations. To survive the data bonanza, we need to improve our apparatus for the exploration and exploitation of the growing wealth of data.
In a previous paper [2], we have proposed optimization based on information on performance gathered in previous runs. This depends on an assumption that scientific users repeat similar requests over similar data as they iterate their understanding or process various samples in the exploration of variants and experimentation settings [3]. The performance database (PDB) is designed to gather information at the level of processing element (PE) class instances, so that we can determine how each class and data stream behaves. PEs are software components that handle the distributed computation by processing and emitting streams of values, and are connected via data streams to form a workflow. For instance, information collected from a previous enactment can tell whether co-locating certain PEs within the same execution engine will result in poor performance because PEs are competing for resources. To be able to predict the enactment performance in this scenario, we must first obtain information about (i) the performance of the class instances, e.g. processing time per unit of data (unit cost), (ii) the hardware configuration for the enactment platforms, (iii) the software stack running on the enactment platforms, and (iv) the workload of the enactment platforms during the enactment. This requires a good understanding of where and how to collect such data, and how to transform the data into useful information in the later stages.
This paper describes the rationale behind the PDB, and proposes a systematic way to implement it. In §2, we introduce a four-stage performance data life cycle— collect, organize, use and discard—to describe how the performance data are collected and transformed into information. We propose a schema for the PDB and follow Gray's Laws on approaching a large-scale scientific data engineering challenge [4], and identify important questions that our PDB should answer. Section 3 describes the PDB prototype. We have implemented a measurement framework (MF) using open grid services architecture data access and integration (OGSA-DAI) [5] and conducted experiments with a real-world life sciences use case from the Advanced Data Mining and Integration Research for Europe (ADMIRE) project [6] to collect data into the PDB. Section 4 shows how we use these data to test our hypothesis about optimization. The related work is discussed in §5. We conclude and discuss further research in §6.
2. Performance data life cycle
A typical scientific workflow comprises complex computation steps on heterogeneous data, and enacts on distributed computing platforms. The performance of enacting such a workflow relies on various factors, e.g. the selection of scattered data sources in the workflow may trigger a high communication cost of moving data to the computing platforms. However, workflows need to be mapped onto the appropriate platforms, in terms of workloads, computing paradigms and resource availability, in order to achieve maximum efficiency. By identifying these factors and designing a structured system to capture relevant performance data, useful information can be extracted to further improve the enactment. In the following sections, we describe in detail how this information is captured and stored in the PDB, and how it can be used during workflow optimization in the four-stage life cycle: collect, organize, use and discard.
(a) Collect stage: data sources and collecting mechanism
(i) Data sources
Figure 1a shows an overall picture of the data sources for a PDB. The data-intensive virtual machine (DIVM) configuration and PE deployment and connections data are harvested from system log files as they record the way the software was configured. The hardware configuration is currently obtained by hand but should be based on common information model data. The timings of enactments are collected by an MF. We define a DIVM as an abstraction for the computational environment in which a PE instance (PEI) runs during enactment. Figure 1b shows a DIVMk which is an abstraction of the layers of software and hardware. As a typical operational environment is complex, deriving relevant information, e.g. whether two PEIs are competing for the same CPU, is difficult. The DIVM abstraction is used to reduce the complexity by suppressing detail, so that queries on the PDB can discriminate such conflict criteria. This abstraction also has to reflect locality so that relative costs of inter-PEI communication can be estimated using the PDB.
Figure 1. Data sources for a PDB. (a) Information flow into the PDB and (b) DIVM abstraction.
The PEIs are executed in execution engines (EE), which are the highest layer of DIVMs. The mapping of a particular EE, e.g.
(ii) Collecting mechanisms
We use system logs and the MF to collect three types of data: configuration of DIVMs, deployment and interconnection of PEIs, and timing of the actual enactments. System logs keep track of the activities involved in managing DIVM lifetimes. The data gathered from the log files allow reconstruction of the software and hardware stack at a given time. Logs are also generated to track which PEIs are running on each DIVM at a given time.
Assume that the log above is generated by the system, and
The MF captures the enactment performance using a specific type of PE, named an observer. As illustrated in figure 2a, an observer receives data from input streams from a previous PE, performs a time-stamping, and outputs the data to the following PE without altering the content of the data. By placing observers on the data streams, detailed enactment information can be captured and used for making appropriate optimization decisions. We have designed three types of observer, each with minimum impact on performance and a capability to capture performance data from a different perspective, as illustrated in figure 2b:
— Type observer is used to capture type information of the data flow on any given data stream. Together with the semantic information of the workflows, the type information may be useful in estimating the data transfer cost and determining the ability to split a data stream and process in parallel. The type information should be collected prior to enactment. In due course this will be superseded by capturing the equivalent data from the language parser. | |||||
— Rate observer measures the data-processing rate of data streams. When used in pairs, rate observers can capture the processing time per unit of data of a PEI. As shown in figure 2b, a rate observer is placed before PEk to capture its input data rate during the enactment. Together with the output data rate measured by another rate observer, we can know the processing rate of PEk. | |||||
— Buffer observer is used to observe the buffer implementation of data streams. Buffers are used when there are variations in the data-processing rate of any connected PEs. In figure 2b, buffer observers on the two input streams of PEl determine the rates at which data arrive on each stream, from which we can infer the critical path of the workflow. |

Figure 2. The design and use of observer. (a) Design of observer and (b) types of observer.
The results collected from observers are sent to a gatherer PE, which will insert these data into the PDB after the enactment is finished. This reduces the overhead incurred during enactment.
(b) Organize stage: organizing performance data in the performance database
The tables in the PDB are divided into three categories according to how their data are collected (figure 3). The first type of table stores data harnessed from log files, e.g. Figure 3. Logical content of PDB.
(c) Use stage: transforming performance database data into information
Figure 4 illustrates the stages of PDB use. For each stage, we formulate different sets of queries to access the PDB. The PDB data allow us to understand and validate hypotheses about PEs and their enactment behaviour, such as the type of data flow in the data stream and the processing rate of PEs on different platforms. Following Gray [4], we identify the most prevalent 20 questions covering the three stages. An example is shown below, and two more appear in appendix A.
Figure 4. Overall information flow in the PDB.
Question. What is the characteristic performance of instances of class
Usage. To find out the most suitable DIVM to enact a PEI. The query will retrieve all of the previous execution records of a PEI on all of the DIVM, filtered by
This question focuses on the enactment behaviour of the PEIs and tries to validate the effect of DIVM selection on the enactment performance. The last question in appendix A attempts to understand the cohabiting behaviour of PEIs as that should affect the mapping decisions. The information gained from the study of enactment behaviour of the PEs is crucial for constructing a cost model to analyse the performance of previous enactments and to predict the performance of future enactments. For instance, data from previous enactments show that mapping
(d) Discard stage: cleaning performance database data
The size of the PDB is expected to grow rapidly. The number of rows inserted into the
3. Prototype implementation
The prototype has been implemented in the ADMIRE project (http://www.admire-project.eu/)—a European-funded project that is pioneering architecture and models that deliver a coherent, extensible and flexible framework to facilitate exploration and exploitation of data.
Figure 5a shows the DIVM abstraction in the prototype implementation. We have reduced the software and hardware stacks for the prototype to four layers. The only hardware layer in the current abstraction is the processor. We assume that the layers below the processor, e.g. network, are kept in separate systems. As for the prototype, we consider that each processor is running a single operating system on a single virtual machine, thus the virtual machine layer is immediately on top of the processor layer. The virtual machine encapsulates a packaging of hypervisor, operating system and Java virtual machine and in future the model may need to characterize data-storage architecture and communication architecture. Each virtual machine can run one or more services, e.g. OGSA-DAI engine (ODE), ADMIRE gateway or data source. Figure 5b illustrates an example set-up of two ADMIRE sites. Figure 5. DIVM abstraction in the prototype implementation. (a) DIVM abstraction and (b) example set-up of two ADMIRE sites.
(a) Collecting data from configuration logs
There are three different types of services in the example shown in figure 5b, and the configuration logs are collected as follows:
— When an ADMIRE gateway is started, the gateway start-up script will detect on which processor (machine) it is hosted, e.g. | |||||
— When an ODE is started, the ODE will determine which processor (machine) it is running on, e.g. | |||||
— To capture when a data source such as a MySQL daemon is launched, we have to add additional commands to the start-up script to capture the data, e.g. |
(b) Collecting data from the measurement framework
In an earlier section, we proposed a logical design of the MF that uses three types of observer to collect performance data: type observer, rate observer and buffer observer. The type observer is applied during the Data-Intensive System Process Engineering Language (DISPEL) processing stage [7]. DISPEL is a language developed in the ADMIRE project which is processed to generate a graph for optimization and enactment. When the DISPEL language processor walks the generated graph verifying that source assertions and destination requirements about the structure types of values in the data stream are compatible. The input and output structural type of every PEI in the request will be recorded.
Both rate observer and buffer observer are implemented during the enactment stage. We collect these data by observing the pipe buffer implementation in the data stream. During the enactment, the data producer of a data stream writes (
— Requests: a unique identifier for every workflow and a Unix time stamp indicates when the workflow request is received. | |||||
— Activities: the PEs and the PEIs used in every workflow. In the prototype, PEs are implemented as OGSA-DAI activities and executed in ODEs, thus, the PEs' names appear in this log as OGSA-DAI activities. Every PEI is given a unique identifier. PEIs are connected with data streams (OGSA-DAI pipes). The information about the sources and targets of every data stream are stored separately: pipes–sources and pipes–targets, which allow the MF to store precisely when every buffer event occurs in both source and target of any given data stream. | |||||
— Activities–events: the states of every PEI i.e. | |||||
— Pipes: the sources and the targets of data streams (outputs of PEIs) in every workflow. Each data stream (pipe) is given a unique identifier. | |||||
— Pipes–events: the Unix time stamp for every buffer event on both pipes–sources and pipes–targets. Four types of events are recorded: |
PEIs and data stream identifiers are unique and never re-used so that instances in successive requests are not conflated in queries against the PDB. From these enactment data, we extract the information shown in table 1. We have automated the instrumentation by embedding the MF in the OGSA-DAI implementation. During the enactment, event listeners are running on separated threads to capture the data stated above. There are two ways to organize these data: record in log files then load the records from these log files to populate the PDB after the enactment, or populate right away to the PDB hosted on a separate DIVM. The file-based logging was used first to decouple the PDB scheme design from the instrumentation design. Now these have stabilized direct updates to PDB have been implemented. Our next architectural step is to automate the analysis process in which the gateway that handled the enactment request will trigger the processing of the performance data after each enactment.
information extracted | operations for extracting information |
---|---|
total units of data flow in a data stream | count of |
time spent in reading from/writing to a data stream | the reading time is the average difference between successive |
average data production time of a source PEI | the interval between first and last |
data production rate of a source PEI | ![]() ![]() |
average data consumption time of a target PEI | same as average data production time but observing |
data consumption rate of a target PEI | same as above |
compare the data production rate of source PEI and data consumption rate of target PEI | observe the number of |
4. Demonstration of performance database usage
The previous sections describe the overall design of the PDB and how the prototype is implemented in the ADMIRE project. We have deployed the measurement probes in ADMIRE gateways and populated the PDB with performance data collected during the enactment of real-world workflows. The main usage of the PDB is to bridge the gap between data collection and optimization. In this section, we evaluate how performance data for two workflows are used in extracting information for later optimization. The details about the optimization algorithm for streaming workflows are outside the scope of this paper and will be presented in a separate publication.
The first workflow is a data integration and transformation workflow that is frequently found across domains, as shown in figure 6. This workflow is a typical scientific workflow that involves: (i) retrieving data from distributed and heterogeneous data sources (retrieve data from a database using Figure 6. Common data integration workflow. Figure 7. Events trace for data streams used in the workflow. (a) Write and read events and (b) WriteBlocked and ReadBlocked events.
The first few
The optimization challenge is to identify which branch in the graph (subDAG) is causing the delay. One of the ways to identify this problem is by looking at the blocked events. We executed the query below to count the events that occurred during the enactment to plot the graph shown in figure 8 (note that the vertical scale is logarithmic).
Figure 8. Events count for data streams used in the workflow.
When we traced the events that occur in all of the data streams, we find that
The second workflow shown in figure 9 is used in automating gene expression annotation from the EURExpress-II project [8]. The current annotation is made manually by domain experts, and is costly and time-consuming (4 TB image data). Optimizing the enactment of this workflow is challenging because finding all of the mapping candidates of 43 PEIs onto DIVMs within multiple constraints (e.g. data source location, load balance among DIVMs) involves exhaustive computation. In general, some of the PEIs have a relatively small unit cost, e.g.




Figure 9. Classifier training in the EURExpressII workflow.
The total units of data flow along data streams is important in making mapping decisions. Assigning two PEIs connected with a heavy data stream onto separate DIVMs will incur a large communication cost in moving the data across DIVMs. For instance,
5. Related work
Using workflow systems to organize the enactment of scientific experiments is becoming increasingly common and a wide range of workflow systems are available. Deelman et al. [10] propose a taxonomy of workflow features and review existing workflow systems. Some of the workflow systems, e.g. Condor DAGman [11] and Taverna [12], provide monitoring services to monitor events corresponding to the state transition of processing entities. This information is mainly used for fault tolerance and not for optimization. Our work looks into fine-grained performance monitoring of processing entities including timing information of every single unit of data in the data stream.
Similar to our work, ASKALON [13] monitors the status of execution and provides feedback for optimization. The execution engine allows users to observe various metrics of the execution progress. ASKALON has developed a performance analysis system and a formal overhead analysis model that is useful in estimating task execution times and data transfer in a distributed environment. Instead, we focus on buffering analysis at data-stream level and performance data collected from previous runs to estimate the enactment time.
ParaTrac [14] is a fine-grained profiler that provides an effortless way to extract low-level input/output (I/O) profiles by looking deep into user-level file system and process-tracing techniques. It provides a more accurate study of workflow characteristics and supports fine-grained and realistic scheduling of workflows. However, its file I/O profiling technique is not applicable to streaming workflows. In contrast, our proposed MF looks at the buffer implementations for data streams and keeps track of every buffer event.
DIPAS [15] is a distributed performance analysis service that supports tracing execution, analysing performance overheads and searching for performance problems of Web service-based workflows in the Grid. It can be easily integrated with other Grid workflow middleware services through a loosely coupled Web services resource framework (WSRF)-based interface. Again, the main differentiation is that our work focuses on streaming workflow.
6. Conclusion and future work
The data-streaming model allows overlapping execution of computing elements, which shortens the overall response time of workflow enactment. The data streaming avoids writing intermediate values to disk except when buffers overflow RAM. It therefore can process requests with large-scale data by an efficient implementation of buffering in the main memory where the processing speeds of memory access outperform the disk by a factor of more than 105 [16]. Thus, the streaming model enables users to process large-scale data with modest computing facilities. However, organizing the enactment of streaming workflows is difficult because it needs to allocate the whole workflow at once, which requires a good understanding of each PEI's enactment behaviour.
The present work aims to solve this problem by introducing a PDB, where information is extracted from performance data gathered in previous runs. We proposed the performance data life cycle and discussed the rationale for the PDB. We presented a systematic way to implement the PDB with a novel approach to organizing performance data for streaming workflows. We presented an initial design for its schema and illustrated its power with example queries. We built a prototype as part of the ADMIRE project and used it to collect performance data for real-world use cases. We showed phenomena in the derived data that are highly relevant to understanding data streaming and to optimizing workflows. We have then discussed how these data can be used in making optimization decisions.
The value of the PDB depends on two criteria: stability and recognition. The former is related to the consistency of the PDB behaviour across application domains, users, scales and execution environments, while the latter is related to how the PDB identifies and supports numerical characterization of relevant performance phenomena. The data to date are based on small-scale workflows from a few domains and enacted on workstations. We will extend the range of experiments to a representative set of applications and scales. We will then refine the queries and the architecture to handle the new issues exposed. We would be pleased to cooperate in the extension of the PDB to serve any workflow systems and optimization. Work is underway to use the PDB to validate models of streaming data performance and to use these models with parameters from the PDB to optionally deploy data-streaming enactments using distributed data on heterogeneous platforms.
Acknowledgements
The work presented in this paper is supported by the ADMIRE project (funded by EU FP7-ICT-215024) and the e-Science Core Programme Senior Research Fellow programme (funded by the UK EPSRC EP/D079829/1). We also acknowledge our very helpful colleagues in the OGSA-DAI team.
Appendix A. Example questions
What is the performance of instances of
PEa compared with instances ofPEb ?Usage: To choose between two PEs with equivalent functionality.
Compare the performance of instances of
PEa depending on whether or not there is an instance ofPEb in the sameDIVM .Usage: To find out whether co-locating two PE instances on the same DIVM will result in loss of performance. This query can guide the decision of whether to split the workflow across multiple DIVMs to speed up the performance.
If two instances are associated with the same
request_id , then we can infer that they coexist at the same time and potentially overlap in their use of a DIVM. This is achieved with the following queries:Construct a view,
PEIonDIVM that joins data from related tables.Construct a view,
Co_located to compute a subset ofPEIonDIVM where therequest_id anddivm are equal.Construct another view,
Not_co_located to compute a subset ofPEIonDIVM where therequest_id anddivm are not equal.Finally, compute the performance from the
UNION of both views above.