c# - Exception using Rx and Await to accomplish reading file line by line async -


i learning use rx , tried sample. not fix exception happens in highlighted while statement - while(!f.endofstream)

i want read huge file - line line - , every line of data - want processing in different thread (so used observeron) want whole thing async. want use readlineasync since returns task , can convert observables , subscribe it.

i guess task thread create first, gets in between rx threads. if use observe , subscribe using currentthread, still cannot stop exception. wonder how accomplish neatly aysnc rx.

wondering if whole thing done simpler ?

    static void main(string[] args)     {         rxwrapper.readfilewithrxasync();         console.writeline("this should called before file read begins");         console.readline();     }      public static async task readfilewithrxasync()     {         task t = task.run(() => readfilewithrx());         await t;     }       public static void readfilewithrx()     {         string file = @"c:\filewithlonglistofnames.txt";         using (streamreader f = file.opentext(file))         {             string line = string.empty;             bool continueread = true;              ***while (!f.endofstream)***             {                 f.readlineasync()                        .toobservable()                        .observeon(scheduler.default)                        .subscribe(t =>                            {                                console.writeline("custom code manipulate every line data");                            });             }          }     } 

the exception invalidoperationexception - i'm not intimately familiar internals of filestream, according exception message being thrown because there in-flight asynchronous operation on stream. implication must wait readlineasync() calls finish before checking endofstream.

matthew finlay has provided neat re-working of code solve immediate problem. however, think has problems of own - , there bigger issue needs examined. let's @ fundamental elements of problem:

  • you have large file.
  • you want process asynchronously.

this suggests don't want whole file in memory, want informed when processing done, , presumably want process file fast possible.

both solutions using thread process each line (the observeon passing each line thread thread pool). not efficient approach.

looking @ both solutions, there 2 possibilities:

  • a. takes more time on average read file line process it.
  • b. takes less time on average read file line process it.

a. file read of line slower processing line

in case of a, system spend of it's time idle while waits file io complete. in scenario, matthew's solution won't result in memory filling - it's worth seeing if using readlines directly in tight loop produces better results due less thread contention. (observeon pushing line thread buy if readlines isn't getting lines in advance of calling movenext - suspect - test , see!)

b. file read of line faster processing line

in case of b (which assume more given have tried), lines start queue in memory and, big enough file, end of in memory.

you should note unless handler firing off asynchronous code process line, lines processed serially because rx guarantees onnext() handler invocations won't overlap.

the readlines() method great because returns ienumerable<string> , it's enumeration of drives reading file. however, when call toobservable() on this, enumerate fast possible generate observable events - there no feedback (known "backpressure" in reactive programs) in rx slow down process.

the problem not toobservable - it's observeon. observeon doesn't block onnext() handler invoked on waiting until it's subscribers done event - queues events fast possible against target scheduler.

if remove observeon, - long onnext handler synchronous - you'll see each line read , processed 1 @ time because toobservable() processing enumeration on same thread handler.

if isn't want want, , attempt mitigate in pursuit of parallel processing firing async job in subscriber - e.g. task.run(() => /* process line */ or similar - things won't go hope.

because takes longer process line read line, create more , more tasks aren't keeping pace incoming lines. thread count gradually increase , starving thread pool.

in case, rx isn't great fit really.

what want small number of worker threads (probably 1 per processor core) fetch line of code @ time work on, , limit number of lines of file in memory.

a simple approach this, limits number of lines in memory fixed number of workers. it's pull-based solution, better design in scenario:

private task processfile(string filepath, int numberofworkers) {     var lines = file.readlines(filepath);             var paralleloptions = new paralleloptions {         maxdegreeofparallelism = numberofworkers     };        return task.run(() =>          parallel.foreach(lines, paralleloptions, processfileline)); }  private void processfileline(string line) {     /* processing logic here */     console.writeline(line); } 

and use this:

static void main() {            var processfile = processfile(         @"c:\users\james.world\downloads\example.txt", 8);      console.writeline("processing file...");             processfile.wait();     console.writeline("done"); } 

final notes

there ways of dealing pressure in rx (search around discussions) - it's not rx handles well, , think resulting solutions less readable alternative above. there many other approaches can @ (actor based approaches such tpl dataflows, or lmax disruptor style ring-buffers high-performance lock free approaches) core idea of pulling work queues prevalent.

even in analysis, conveniently glossing on doing process file, , tacitly assuming processing of each line compute bound , independent. if there work merge results and/or io activity store output bets off - need examine efficiency of side of things too.

in cases performing work in parallel optimization under consideration, there many variables in play best measure results of each approach determine best. , measuring fine art - sure measure realistic scenarios, take averages of many runs of each test , reset environment between runs (e.g. eliminate caching effects) in order reduce measurement error.


Comments

Popular posts from this blog

windows - Single EXE to Install Python Standalone Executable for Easy Distribution -

c# - Access objects in UserControl from MainWindow in WPF -

javascript - How to name a jQuery function to make a browser's back button work? -