over 4 years ago

In addition to the pipe design, I am interested in other two features of Stream: lazy evaluation and parallel stream. Lazy evaluation can be considered as the computation is delayed util it is actually needed. And the parallel stream can execute the entire pipe in parallel. The most important thing is that these two features are optimized for JVM and should be more efficient than our implementation.

When I see the lazy evaluation, I think it could be benefit to loading large files, e.g., reducing the memory consumption. However, I am not sure for that. Therefore, I design an experiment to confirm my thought. First, an interface FileSearchStrategy is created (Code List 1) and its method can accept a file (folder), a keyword, and a result collector (SearchResultCollector). Each implementation can use different methodologies to search the keyword in a file and put the result, a tuple <filename, line number, line content>, into the collector.

Code List 1 - FileSearchStrategy Interface
package java8.stream;

import java.io.File;

public interface FileSearchStrategy {

    public void search(File root, String keyword, SearchResultCollector collector);
}

The File object in Java can be pointed to a file or a folder. Thus, AbstractSearchStrategy (Code List 2) provides an implementation for the method search(File, String, SearchResultCollector) of FileSearchStrategy to traverse each folder recursively. A hook method is declared for concrete classes to scan the content of a real file.

Code List 2 - AbstractSearchStrategy handles the directory traversal
package java8.stream;

import java.io.File;

public abstract class AbstractSearchStrategy implements FileSearchStrategy {

    @Override
    public void search(File root, String keyword, SearchResultCollector collector) {
        if(root.isDirectory()) {
            File[] files = root.listFiles();
            if(files != null) {
                for(File file : files) {
                    search(file, keyword, collector);
                }
            }
        }
        else {
            collector.increaseFileCount();
            scanKeyword(root, keyword, collector);
        }
    }

    protected abstract void scanKeyword(File file, String keyword, SearchResultCollector collector);
}

The infrastructure is completed. And the first strategy implementation DefaultSearchStrategy (Code List 3) uses the frequently-used algorithm on text files before having Stream API: scan line by line. The result of the strategy is also the baseline of the experiment.

Code List 3 - The tranditional text file reading
package java8.stream;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;

public class DefaultSearchStrategy extends AbstractSearchStrategy {

    @Override
    protected void scanKeyword(File file, String keyword, SearchResultCollector collector) {
        String path = file.getName();
        try (FileReader fileReader = new FileReader(file);
            BufferedReader reader = new BufferedReader(fileReader)) {
            String line = null;
            long lineNumber = 1;
            while((line = reader.readLine()) != null) {
                if(line.contains(keyword)) {
                    collector.add(new KeywordSearchResult(path, line, lineNumber));
                }
                lineNumber++;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Then, the implementation of AllLinesSearchStrategy uses the readAllLines(Path) method of Files class which coming with the NIO 2 (New I/O 2) package in Java 7 (Code List 4). In fact, the description in Java Doc says that it is convenient to read all lines in a single operation and not intended for reading in large files. Therefore, the result of the strategy should be the worst case in the experiment.

Code List 4 - The method that uses Files.readAllLines(Path)
package java8.stream;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.List;

public class AllLinesSearchStrategy extends AbstractSearchStrategy {

    @Override
    protected void scanKeyword(File file, String keyword, SearchResultCollector collector) {
        String path = file.getName();
        try {
            List<String> lines = Files.readAllLines(file.toPath());
            int linesCount = lines.size();
            for(int index = 1; index < linesCount; index++) {
                String line = lines.get(index);
                if(line.contains(keyword)) {
                    collector.add(new KeywordSearchResult(path, line, index));
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

For convenient to the following parallel stream experiment, the implementation of StreamSearchStrategy puts the pipe combination into another method (Code List 5): scanStream(Stream, String, SearchResultCollector). And then uses the lines() method of the BufferedReader class to get the Stream as input. In order to obtain the line number, the intermediate operation in the pipe is map(Function) which transfers a string into an object consisting of a line number and a string (using KeywordSearchResult for simplification). And filter(Predicate) is used to filter out objects that do not match.

Code List 5 - The strategy that uses Stream to traverse directories and scan file
package java8.stream;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Files;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class StreamSearchStrategy extends AbstractSearchStrategy
    implements Function<String, KeywordSearchResult> {

    protected long _lineCounting;
    protected String _scanningPath;

    @Override
    public void scanKeyword(File file, String keyword, SearchResultCollector collector) {
        _scanningPath = file.getName();
        try (FileReader fileReader = new FileReader(file);
            BufferedReader reader = new BufferedReader(fileReader)) {
            scanStream(reader.lines(), keyword, collector);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void scanStream(Stream<String> stream, String keyword, SearchResultCollector collector) {
        _lineCounting = 1;
        collector.addAll(stream
                            .map(this)
                            .filter(r -> r.getKeywordAppearedLine().contains(keyword))
                            .collect(Collectors.toList()));
    }

    @Override
    public KeywordSearchResult apply(String line) {
        return new KeywordSearchResult(_scanningPath, line, _lineCounting++);
    }
}

Since the line number is required, the scanStream(Stream, String, SearchResultCollector) method of the StreamSearchStrategy class uses map(Function) before filter(Predicate). If the line number can be ignored, as shown in Code List 6, the StreamSearchStrategyV2 class uses filter(Predicate) before map(Function), does it affect the result?

Code List 6 - The strategy that ignores the line number
package java8.stream;

import java.util.stream.Collectors;
import java.util.stream.Stream;

public class StreamSearchStrategyV2 extends StreamSearchStrategy {

    public void scanStream(Stream<String> stream, String keyword, SearchResultCollector collector) {
        collector.addAll(stream
                            .filter(s -> s.contains(keyword))
                            .map(s -> new KeywordSearchResult(_scanningPath, s, 0))
                            .collect(Collectors.toList()));
    }
}

The size of the system memory grows quickly, and the operating system usually caches the file content. Therefore, reading the same file again is speeded up. However, the acceleration will be an impact factor. To avoid reading the file content from the cache, the experiment prepares three folders, in each, consisting of the same file structure and content listed in Table 1. For example, the sub-folder A has 10 files, 100k records in each file (size is 3.62 MB), and sub-folders B, C, etc are the same. The sub-folder G cloned a copy of the sub-folders A ~ F, and as a result, each folder has 120 files, totally sized 4.45 GB.

Table 1 - Test data

Sub Folder Records File Amount Size/File (MB)
A 100k 10 3.62
B 200k 10 7.24
C 400k 10 14.4
D 800k 10 28.9
E 1600k 10 57.9
F 3200k 10 115
G A + B + C + D + E + F

However, the success of the method depends on the size of the system memory. The experiment environment is listed in Table 2, and the test process is shown in Code List 7. Each strategy searches the keyword in the three prepared folders in order. When the second strategy is started to search the first folder, the content of the other two folders should be loaded into memory by the first strategy. The total size is 8.9 GB that is larger than the size of the system memory. Therefore, the content of the first folder is not in the cache. A MemoryUsageMonitor object monitors the memory usage of JVM periodically and records the peak value in the experiment,

Table 2 - Test environment

Hardware Specification
CPU Intel Core i5-2400
Memory 8GB
HDD Seagate ST3160815AS 160GB
OS Windows 7 SP1
JVM 1.8.0-b132
Code List 7 - Test jobs schedule
public void runJobs(File[] jobs, String keyword) {
    Set<String> keys = _strategies.keySet();
    System.out.println("Strategy, Time (ms), Files, Folder, Found, Memory");
    for(String key : keys) {
        FileSearchStrategy strategy = _strategies.get(key);
        for(File job : jobs) {
            runJob(job, strategy, keyword);
        }
    }
}

private void runJob(File job, FileSearchStrategy strategy, String keyword) {
    _results.clear();
    _largestMemoryUsage = 0;
    long startTime = System.currentTimeMillis();
    MemoryUsageMonitor.getInstance().startMonitor();
    strategy.search(job, keyword, _results);
    long time = System.currentTimeMillis() - startTime;
    MemoryUsageMonitor.getInstance().stopMonitor();
    int matchCount = _results.getResults().size();
    long filesCount = _results.getProcessedFileCount();
    String jobName = job.getName();
    String strategyName = strategy.getClass().getSimpleName();
    String result = String.format("%s, %d, %d, %s, %d, %d", strategyName, time, filesCount, jobName, matchCount, _largestMemoryUsage);
    System.out.println(result);
    System.gc();
}

Well, it is time to show the result! The result is shown in Table 3. As expected, All Lines strategy uses the most memory (over 1GB) and took the longest time (20 seconds longer than other strategies). However, the differences between Default, Stream, and Stream v2 are not significant (about 3 seconds). The memory usage of the Stream v2 and Default strategies are almost the same, but the map(Function) seems to be a bad cost.

Table 3 - Test result of four strategies

Strategy Time (ms) Memory (MB)
Default 106788 42.8
Stream 109272 57.4
Stream v2 109402 41.7
All Lines 128749 1140.9

The AbstractSearchStrategy uses the tranditional foreach to traverse each file. Does the parallel stream benefit? or make worse? To understand that, the implementation of the search(File, String, SearchResultCollector) method of the AbstractSearchStrategy class is changed to Code List 8, and run the experiment again.

Code List 8 - The directories traversal with Stream API
@Override
public void search(File root, String keyword, SearchResultCollector collector) {
    if(root.isDirectory()) {
        try {
            Files.list(root.toPath())
                    .parallel()
                    .forEach(p -> search(p.toFile(), keyword, collector));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    else {
        collector.increaseFileCount();
        scanKeyword(root, keyword, collector);
    }
}

Suppose the parallel stream can speed up the search. However, from the result of Table 4, the parallel stream does not speed up the search, and makes wrose: huge memory consumption. I am surprised at that the memory usage of the Stream v2 strategy is more than that of the Stream strategy, and I don't known how to expain the phenomenon?

Table 4 - Test result of four strategies with parallel directories traversal

Strategy Time (ms) Memory (MB)
Default 109386 281.7
Stream 109230 397.1
Stream v2 108978 487.5
All Lines 122702 1441.9

From the experiment result, when I/O is involved, even using parallel stream, a pipe with Stream does not speed up or use less memory. Sometimes, it is slower. If the data is not in the files on hard disk, how about the effect on processing data in the memory? Therefore, the third experiment is designed. Suppose that 100k ~ 6400k records are kept in an ArrayList, and based on the parameter (Code List 9), use the parallelStream() or stream() method to obtain the input of scanStream(Stream, String, SearchResultCollector).

Code List 9 - Testing the parallel stream
public void runTests(int times, String keyword) {
    _testTimes = times;
    StreamSearchStrategy strategy = new StreamSearchStrategy();
    StreamSearchStrategyV2 strategy2 = new StreamSearchStrategyV2();
    System.out.println("Strategy, Time, Parallel, Collection Size, Match, Memory");
    for(int index = 0; index < _testTimes; index++) {
        Collection<String> collection = createCollection(_collectionSize);
        runTest(new ArrayList<String>(collection), false, strategy, keyword);
        runTest(new ArrayList<String>(collection), true, strategy, keyword);
        runTest(new ArrayList<String>(collection), false, strategy2, keyword);
        runTest(new ArrayList<String>(collection), true, strategy2, keyword);
        _collectionSize = _collectionSize * 2;
    }
}
private void runTest(Collection<String> collection, boolean useParallel, StreamSearchStrategy strategy, String keyword) {
    SearchResultTableModel results = new SearchResultTableModel();
    Stream<String> stream = useParallel? collection.parallelStream() : collection.stream();
    long startTime = System.currentTimeMillis();
    strategy.scanStream(stream, keyword, results);
    long time = System.currentTimeMillis() - startTime;
    int collectionSize = collection.size();
    String strategyName = strategy.getClass().getSimpleName();
    String result = String.format("%s, %d, %s, %d", strategyName, time, String.valueOf(useParallel), collectionSize);
    System.out.println(result);
}

The result is listed in Table 5. Note that, in the 100k column, no matter run Stream strategy first or run Stream v2 strategy first, the first run strategy always get a bad result. The cause may be the cold start up of the program. Thus, the 100k column is ignored. Starting from 200k, both the Stream and Stream v2 strategies can be beneifted by parallelStream() to reduce the execution time a lost with. In the 6400k column, Stream v2 with parallel stream can save 136 ms. In general, the performance of the Stream v2 strategy is better than that of the Stream strategy.

Table 5 - The execution time (ms) with parallel stream

Strategy Parallel 100k* 200k 400k 800k 1600k 3200k 6400k
Stream Close 47 8 15 30 57 187 342
Stream Open 29 5 11 21 42 83 165
Stream v2 Close 12 7 12 24 47 93 192
Stream v2 Open 2 3 5 9 15 29 56

It is time to give a conclusion. First, to be benefited by the lazy evaluation, the optimization of the pipe design is required. Bad pipe design makes the performance worse. And I/O can not get lots of benefit from the lazy evaluation. The parallel stream can speed up the processing on some kinds of data sources. The I/O data source or the data source that to access may have race condition will not speed up by the parallel stream. If the data is in memory already or the data source that to access without lock, parallel stream can speed up much. However, the parallel stream also increases the memory usage, so the parallel stream should be used carefully.

ps. The source code is still under organization. When the organization is completed, the source code will be opened on GitHub.

← Java 8 初探 - Lazy Evaluation & Parallel Stream 希望Java未來能新增的特性 →