about 4 years ago

除了Pipe的設計外,Stream另外讓我好奇的二個特色:lazy evaluationparallel stream。lazy evaluation可以想成是延後運算到真正必要的時候,而parallel stream則是將Pipe以平行運算的方式進行,最重要的是這二個特色都是針對JVM最佳化過的,應該比我們自己寫來的更有效率。

當看到lazy evaluation我最先想到的是,應該對載入大型檔案有幫助,例如:減少記憶體使用量,但我沒把握這想法是否正確,所以設計了一個實驗試試看我的想法。首先,設計一個FileSearchStrategy介面(Code List 1),可以輸入檔案(目錄)、關鍵字和結果收集器(SearchResultCollector),每個實作可以用不同的方式從檔案中搜尋關鍵字,並將結果<檔案名稱、行數、該行內容>存放到收集器中。

Code List 1 - FileSearchStrategy Interface
package java8.stream;

import java.io.File;

public interface FileSearchStrategy {

    public void search(File root, String keyword, SearchResultCollector collector);
}

由於Java的File可能指向一個檔案或目錄,因此AbstractSearchStrategy (Code List 2)實作FileSearchStrategysearch(File, String, SearchResultCollector),以遞迴的方式走訪每一層目錄,並留一個hook method讓繼承者提供實際掃描檔案的實作。

Code List 2 - AbstractSearchStrategy handles the directory traversal
package java8.stream;

import java.io.File;

public abstract class AbstractSearchStrategy implements FileSearchStrategy {

    @Override
    public void search(File root, String keyword, SearchResultCollector collector) {
        if(root.isDirectory()) {
            File[] files = root.listFiles();
            if(files != null) {
                for(File file : files) {
                    search(file, keyword, collector);
                }
            }
        }
        else {
            collector.increaseFileCount();
            scanKeyword(root, keyword, collector);
        }
    }

    protected abstract void scanKeyword(File file, String keyword, SearchResultCollector collector);
}

基礎架構完成後,第一個預設實作DefaultSearchStrategy (Code List 3)是在還沒有Stream API之前,針對文字檔案常用的演算法:逐行掃描。這個實作的結果即是實驗的基準值。

Code List 3 - The tranditional text file reading
package java8.stream;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;

public class DefaultSearchStrategy extends AbstractSearchStrategy {

    @Override
    protected void scanKeyword(File file, String keyword, SearchResultCollector collector) {
        String path = file.getName();
        try (FileReader fileReader = new FileReader(file);
            BufferedReader reader = new BufferedReader(fileReader)) {
            String line = null;
            long lineNumber = 1;
            while((line = reader.readLine()) != null) {
                if(line.contains(keyword)) {
                    collector.add(new KeywordSearchResult(path, line, lineNumber));
                }
                lineNumber++;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

接著AllLinesSearchStrategy的實作(Code List 4),使用在Java 7推出的NIO 2 (New I/O 2)套件所提供的FilesreadAllLines(Path)函式,事實上,在Java Doc的說明中,這個函式只適合用在簡單的案例,並不適合用在大檔案上,因此,這個實作應該會得到實驗中最差的結果。

Code List 4 - The method that uses Files.readAllLines(Path)
package java8.stream;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.List;

public class AllLinesSearchStrategy extends AbstractSearchStrategy {

    @Override
    protected void scanKeyword(File file, String keyword, SearchResultCollector collector) {
        String path = file.getName();
        try {
            List<String> lines = Files.readAllLines(file.toPath());
            int linesCount = lines.size();
            for(int index = 1; index < linesCount; index++) {
                String line = lines.get(index);
                if(line.contains(keyword)) {
                    collector.add(new KeywordSearchResult(path, line, index));
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

為了方便後續平行處理的實驗,StreamSearchStrategy的實作(Code List 5),將實際Pipe的運算組成放到另一個函式:scanStream(Stream, String, SearchResultCollector)中,然後使用BufferedReaderlines()函式取得Stream物件進行運算。為了取得行號,Pipe的第一個intermediate operation使用map(Function),將字串轉成同時帶有行號與字串內容的物件(使用KeywordSearchResult只是簡化實作),然後再用filter(Predicate)過濾掉不要的物件。

Code List 5 - The strategy that uses Stream to traverse directories and scan file
package java8.stream;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Files;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class StreamSearchStrategy extends AbstractSearchStrategy
    implements Function<String, KeywordSearchResult> {

    protected long _lineCounting;
    protected String _scanningPath;

    @Override
    public void scanKeyword(File file, String keyword, SearchResultCollector collector) {
        _scanningPath = file.getName();
        try (FileReader fileReader = new FileReader(file);
            BufferedReader reader = new BufferedReader(fileReader)) {
            scanStream(reader.lines(), keyword, collector);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void scanStream(Stream<String> stream, String keyword, SearchResultCollector collector) {
        _lineCounting = 1;
        collector.addAll(stream
                            .map(this)
                            .filter(r -> r.getKeywordAppearedLine().contains(keyword))
                            .collect(Collectors.toList()));
    }

    @Override
    public KeywordSearchResult apply(String line) {
        return new KeywordSearchResult(_scanningPath, line, _lineCounting++);
    }
}

為了取得行號,所以StreamSearchStrategyscanStream(Stream, String, SearchResultCollector)中Pipe組成是先用map(Function)再用filter(Predicate)。若忽略行號,改成Code List 6的StreamSearchStrategyV2,先使用filter(Predicate)再使用map(Function),對結果會有影響嗎?

Code List 6 - The strategy that ignores the line number
package java8.stream;

import java.util.stream.Collectors;
import java.util.stream.Stream;

public class StreamSearchStrategyV2 extends StreamSearchStrategy {

    public void scanStream(Stream<String> stream, String keyword, SearchResultCollector collector) {
        collector.addAll(stream
                            .filter(s -> s.contains(keyword))
                            .map(s -> new KeywordSearchResult(_scanningPath, s, 0))
                            .collect(Collectors.toList()));
    }
}

目前系統記憶體越來越大,作業系統常會將檔案內容快取在系統記憶體中,當第二次讀取相同檔案時,速度可以加速許多,但對實驗來說,這會是個影響數據的關鍵,為了避免讀到快取的檔案內容,實驗準備了三個資料夾,每個資料夾放入Table 1所述相同的檔案結構,以A子資料夾為例,一個檔案有100k行(筆)資料,大小約3.62 MB,這樣的檔案有10個檔案,B、C等子資料夾依此類推,G子資料夾則是將A~F子資料夾複製一份放入,120個檔案合計4.45 GB。

Table 1 - Test data

Sub Folder Records File Amount Size/File (MB)
A 100k 10 3.62
B 200k 10 7.24
C 400k 10 14.4
D 800k 10 28.9
E 1600k 10 57.9
F 3200k 10 115
G A + B + C + D + E + F

當然,這是否可行要看系統記憶體的多寡,實驗的環境如Table 2所列,而測試方法如Code List 7,每種strategy依序掃描三個資料夾,當第二個strategy開始掃描第一個資料夾時,因為其他兩個資料夾在執行前一個strategy載入,總量有8.9 GB,已超過系統記憶體,第一個資料夾內的內容應該已不在快取中。實驗使用一個MemoryUsageMonitor的物件,定期監控JVM的記憶體使用量,並記錄每次掃描的峰值。

Table 2 - Test environment

Hardware Specification
CPU Intel Core i5-2400
Memory 8GB
HDD Seagate ST3160815AS 160GB
OS Windows 7 SP1
JVM 1.8.0-b132
Code List 7 - Test jobs schedule
public void runJobs(File[] jobs, String keyword) {
    Set<String> keys = _strategies.keySet();
    System.out.println("Strategy, Time (ms), Files, Folder, Found, Memory");
    for(String key : keys) {
        FileSearchStrategy strategy = _strategies.get(key);
        for(File job : jobs) {
            runJob(job, strategy, keyword);
        }
    }
}

private void runJob(File job, FileSearchStrategy strategy, String keyword) {
    _results.clear();
    _largestMemoryUsage = 0;
    long startTime = System.currentTimeMillis();
    MemoryUsageMonitor.getInstance().startMonitor();
    strategy.search(job, keyword, _results);
    long time = System.currentTimeMillis() - startTime;
    MemoryUsageMonitor.getInstance().stopMonitor();
    int matchCount = _results.getResults().size();
    long filesCount = _results.getProcessedFileCount();
    String jobName = job.getName();
    String strategyName = strategy.getClass().getSimpleName();
    String result = String.format("%s, %d, %d, %s, %d, %d", strategyName, time, filesCount, jobName, matchCount, _largestMemoryUsage);
    System.out.println(result);
    System.gc();
}

好啦!該是公布測試結果(Table 3)的時候了,All Lines果然如預期般使用最多的記憶體(超過1GB),所花費的時間也是最長的,多了20秒左右,但Default、Stream和Stream v2之間的差異不大,就執行時間上,三者的差距大約3秒,而記憶體的使用量Stream v2和Default幾乎是一樣,但Stream一開始的map(Function)似乎是致命傷。

Table 3 - Test result of four strategies

Strategy Time (ms) Memory (MB)
Default 106788 42.8
Stream 109272 57.4
Stream v2 109402 41.7
All Lines 128749 1140.9

先前AbstractSearchStrategy使用的是傳統foreach方式走訪所有的檔案,那如果使用Stream的parallel會有幫助嗎?還是更糟?所以,將AbstractSearchStrategysearch(File, String, SearchResultCollector)改成Code List 8後,再次執行測試。

Code List 8 - The directories traversal with Stream API
@Override
public void search(File root, String keyword, SearchResultCollector collector) {
    if(root.isDirectory()) {
        try {
            Files.list(root.toPath())
                    .parallel()
                    .forEach(p -> search(p.toFile(), keyword, collector));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    else {
        collector.increaseFileCount();
        scanKeyword(root, keyword, collector);
    }
}

原先期望透過parallel stream的方式加快執行速度,但從Table 4看來,幾乎沒有加速,反而還帶來了反效果:記憶體使用量暴增。令人意外的是Stream v2的記憶體使用量比Stream還多,不知該怎麼解釋。

Table 4 - Test result of four strategies with parallel directories traversal

Strategy Time (ms) Memory (MB)
Default 109386 281.7
Stream 109230 397.1
Stream v2 108978 487.5
All Lines 122702 1441.9

似乎只要和I/O扯上關係,即使用平行運算的方式也沒有減少太多的執行時間,有時候反而還更慢,那如果資料不在硬碟的檔案裡,都已經在記憶體中,那效果會如何?因此,設計了第三個實驗:100k ~ 6400k筆資料存放在ArrayList中,然後根據參數使用parallelStream()stream()作為scanStream(Stream, String, SearchResultCollector)的輸入(Code List 9)。

Code List 9 - Testing the parallel stream
public void runTests(int times, String keyword) {
    _testTimes = times;
    StreamSearchStrategy strategy = new StreamSearchStrategy();
    StreamSearchStrategyV2 strategy2 = new StreamSearchStrategyV2();
    System.out.println("Strategy, Time, Parallel, Collection Size, Match, Memory");
    for(int index = 0; index < _testTimes; index++) {
        Collection<String> collection = createCollection(_collectionSize);
        runTest(new ArrayList<String>(collection), false, strategy, keyword);
        runTest(new ArrayList<String>(collection), true, strategy, keyword);
        runTest(new ArrayList<String>(collection), false, strategy2, keyword);
        runTest(new ArrayList<String>(collection), true, strategy2, keyword);
        _collectionSize = _collectionSize * 2;
    }
}
private void runTest(Collection<String> collection, boolean useParallel, StreamSearchStrategy strategy, String keyword) {
    SearchResultTableModel results = new SearchResultTableModel();
    Stream<String> stream = useParallel? collection.parallelStream() : collection.stream();
    long startTime = System.currentTimeMillis();
    strategy.scanStream(stream, keyword, results);
    long time = System.currentTimeMillis() - startTime;
    int collectionSize = collection.size();
    String strategyName = strategy.getClass().getSimpleName();
    String result = String.format("%s, %d, %s, %d", strategyName, time, String.valueOf(useParallel), collectionSize);
    System.out.println(result);
}

實驗結果列於Table 5,100k一欄,不論是Stream或Stream v2哪個先執行都會得到不理想的數據,可能是程式冷啟動所引起,所以忽略100k欄的數值。從200k開始,不論使用Stream或Stream v2,都可看到當使用parallelStream()時,執行時間有明顯的減少,在Stream v2的6400k一欄,節省了136 ms。而且整體來說,Stream v2也明顯比Stream要好。

Table 5 - The execution time (ms) with parallel stream

Strategy Parallel 100k* 200k 400k 800k 1600k 3200k 6400k
Stream Close 47 8 15 30 57 187 342
Stream Open 29 5 11 21 42 83 165
Stream v2 Close 12 7 12 24 47 93 192
Stream v2 Open 2 3 5 9 15 29 56

該是結論的時候了,首先,lazy evaluation的效益必須是在pipe的組合上有最佳化過的,若組合的不好反而更糟糕,且在I/O上幫助似乎也不大。parallel stream要能發揮效果必須看資料的來源類型,I/O類型或是存取上有競爭現象的資料較難發揮出效益,但若是在記憶體當中的資料,彼此無存取競爭(不用使用lock)的現象,那parallel stream的效果就相當明顯,不過要注意的是parallel stream也會使記憶體的使用量增加,使用上也要小心。

ps. 測試程式碼還在整理中,若整理完會公開到GitHub上。

← Quick Glance of Java 8 - Stream Quick Glance of Java 8 - Lazy Evaluation & Parallel Stream →