猿问

使用 C# 读取数百万个小文件

我有数百万个每天生成的日志文件,我需要读取所有这些文件并将其放在一起作为单个文件,以便在其他应用程序中对其进行一些处理。


我正在寻找最快的方法来做到这一点。目前我正在使用线程、任务和并行,如下所示:


Parallel.For(0, files.Length, new ParallelOptions { MaxDegreeOfParallelism = 100 }, i =>

{

    ReadFiles(files[i]);

});


void ReadFiles(string file)

{

    try

    {

        var txt = File.ReadAllText(file);

        filesTxt.Add(tmp);

    }

    catch { }

    GlobalCls.ThreadNo--;

}

或者


foreach (var file in files)

{

    //Int64 index = i;

    //var file = files[index];

    while (Process.GetCurrentProcess().Threads.Count > 100)

    { 

        Thread.Sleep(100);

        Application.DoEvents();

    }

    new Thread(() => ReadFiles(file)).Start();

    GlobalCls.ThreadNo++;

    // Task.Run(() => ReadFiles(file));      

}

问题是,读取几千个文件后,读取速度越来越慢!


知道为什么吗?读取数百万个小文件的最快方法是什么?谢谢。


宝慕林4294392
浏览 198回答 3
3回答

呼如林

看起来您正在将所有文件的内容加载到内存中,然后再将它们写回单个文件。这可以解释为什么这个过程随着时间的推移变得更慢。优化该过程的一种方法是将读取部分与写入部分分开,并并行进行。这称为生产者-消费者模式。Parallel它可以使用类、线程或任务来实现,但我将演示基于强大的TPL 数据流库的实现,该库特别适合此类作业。private static async Task MergeFiles(IEnumerable<string> sourceFilePaths,    string targetFilePath, CancellationToken cancellationToken = default,    IProgress<int> progress = null){    var readerBlock = new TransformBlock<string, string>(async filePath =>    {        return File.ReadAllText(filePath); // Read the small file    }, new ExecutionDataflowBlockOptions()    {        MaxDegreeOfParallelism = 2, // Reading is parallelizable        BoundedCapacity = 100, // No more than 100 file-paths buffered        CancellationToken = cancellationToken, // Cancel at any time    });    StreamWriter streamWriter = null;    int filesProcessed = 0;    var writerBlock = new ActionBlock<string>(text =>    {        streamWriter.Write(text); // Append to the target file        filesProcessed++;        if (filesProcessed % 10 == 0) progress?.Report(filesProcessed);    }, new ExecutionDataflowBlockOptions()    {        MaxDegreeOfParallelism = 1, // We can't parallelize the writer        BoundedCapacity = 100, // No more than 100 file-contents buffered        CancellationToken = cancellationToken, // Cancel at any time    });    readerBlock.LinkTo(writerBlock,        new DataflowLinkOptions() { PropagateCompletion = true });    // This is a tricky part. We use BoundedCapacity, so we must propagate manually    // a possible failure of the writer to the reader, otherwise a deadlock may occur.    PropagateFailure(writerBlock, readerBlock);    // Open the output stream    using (streamWriter = new StreamWriter(targetFilePath))    {        // Feed the reader with the file paths        foreach (var filePath in sourceFilePaths)        {            var accepted = await readerBlock.SendAsync(filePath,                cancellationToken); // Cancel at any time            if (!accepted) break; // This will happen if the reader fails        }        readerBlock.Complete();        await writerBlock.Completion;    }    async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2)    {        try { await block1.Completion.ConfigureAwait(false); }        catch (Exception ex)        {            if (block1.Completion.IsCanceled) return; // On cancellation do nothing            block2.Fault(ex);        }    }}使用示例:var cts = new CancellationTokenSource();var progress = new Progress<int>(value =>{    // Safe to update the UI    Console.WriteLine($"Files processed: {value:#,0}");});var sourceFilePaths = Directory.EnumerateFiles(@"C:\SourceFolder", "*.log",    SearchOption.AllDirectories); // Include subdirectoriesawait MergeFiles(sourceFilePaths, @"C:\AllLogs.log", cts.Token, progress);BoundedCapacity用于控制内存使用。如果磁盘驱动器是SSD,您可以尝试使用MaxDegreeOfParallelism大于2的值读取。为了获得最佳性能,您可以考虑写入与包含源文件的驱动器不同的磁盘驱动器。TPL 数据流库可作为.NET Framework 的包提供,并且内置于 .NET Core。

智慧大石

当涉及到IO操作时,CPU并行是没有用的。您的 IO 设备(磁盘、网络等)是您的瓶颈。同时从设备读取数据可能会降低性能。

MYYA

也许您可以只使用 PowerShell 来连接文件,另一种替代方法是编写一个程序,使用FileSystemWatcher类来监视新文件并在创建时追加它们。
随时随地看视频慕课网APP
我要回答