Giter Site home page Giter Site logo

dasync / asyncenumerable Goto Github PK

View Code? Open in Web Editor NEW
441.0 31.0 55.0 372 KB

Defines IAsyncEnumerable, IAsyncEnumerator, ForEachAsync(), ParallelForEachAsync(), and other useful stuff to use with async-await

License: MIT License

C# 100.00%
async foreach parallel enumerator enumerable await csharp asyncenumerator linq async-streams

asyncenumerable's Introduction

Unofficial C# Async Streams

SUMMARY

Makes asynchronous enumeration as easy as the synchronous counterpart. Such feature is also known as 'Async Streams' in upcoming C# 8.0. The library introduces familiar and easy to use syntax, IAsyncEnumerable, IAsyncEnumerator, ForEachAsync(), ParallelForEachAsync(), and other useful extension methods.

PROBLEM SPACE

Helps to (a) create an element provider, where producing an element can take a lot of time due to dependency on other asynchronous events (e.g. wait handles, network streams), and (b) a consumer that processes those element as soon as they are ready without blocking the thread (the processing is scheduled on a worker thread instead). Bassam Alugili made a great explanation on Async Streams in an InfoQ article.

INSTALLATION

Visual Studio's Package Manager Console:

Install-Package AsyncEnumerator -ProjectName MyProject

NET Core CLI:

dotnet add package AsyncEnumerator

Edit .csproj file:

<ItemGroup>
  <PackageReference Include="AsyncEnumerator" Version="4.0.2" />
</ItemGroup>

EXAMPLE 1 (demonstrates usage only)

    using Dasync.Collections;

    static IAsyncEnumerable<int> ProduceAsyncNumbers(int start, int end)
    {
      return new AsyncEnumerable<int>(async yield => {

        // Just to show that ReturnAsync can be used multiple times
        await yield.ReturnAsync(start);

        for (int number = start + 1; number <= end; number++)
          await yield.ReturnAsync(number);

        // You can break the enumeration loop with the following call:
        yield.Break();

        // This won't be executed due to the loop break above
        await yield.ReturnAsync(12345);
      });
    }

    // Just to compare with synchronous version of enumerator
    static IEnumerable<int> ProduceNumbers(int start, int end)
    {
      yield return start;

      for (int number = start + 1; number <= end; number++)
        yield return number;

      yield break;

      yield return 12345;
    }

    static async Task ConsumeNumbersAsync()
    {
      var asyncEnumerableCollection = ProduceAsyncNumbers(start: 1, end: 10);
      int count == 0;
      await asyncEnumerableCollection.ForEachAsync(async number => {
        await Console.Out.WriteLineAsync($"{number}");
        count++;
        if (count >= 5)
        {
          // You can break the ForEachAsync loop with the following call:
          ForEachAsync.Break();
        }
      });
    }

    // Just to compare with synchronous version of enumeration
    static void ConsumeNumbers()
    {
      var enumerableCollection = ProduceNumbers(start: 1, end: 10);
      foreach (var number in enumerableCollection) {
        Console.Out.WriteLine($"{number}");
      }
    }

EXAMPLE 2 (LINQ-style extension methods)

    using Dasync.Collections;
    
    IAsyncEnumerable<Bar> ConvertGoodFoosToBars(IAsyncEnumerable<Foo> items)
    {
        return items
          .Where(foo => foo.IsGood)
          .Select(foo => Bar.FromFoo(foo));
    }

EXAMPLE 3 (async parallel for-each)

    using Dasync.Collections;
    
    async Task<IReadOnlyCollection<string>> GetStringsAsync(IEnumerable<T> uris, HttpClient httpClient, CancellationToken cancellationToken)
    {
        var result = new ConcurrentBag<string>();
        
        await uris.ParallelForEachAsync(
            async uri =>
            {
                var str = await httpClient.GetStringAsync(uri, cancellationToken);
                result.Add(str);
            },
            maxDegreeOfParallelism: 5,
            cancellationToken: cancellationToken);
        
        return result;
    }

WILL THIS MAKE MY APP FASTER?

No and Yes. Just making everything async makes your app tiny little bit slower because it adds overhead in form of state machines and tasks. However, this will help you to better utilize worker threads in the app because you don't need to block them anymore by waiting on the next element to be produced - i.e. this will make your app better in general when it has such multiple enumerations running in parallel. The best fit for IAsyncEnumerable is a case when you read elements from a network stream, like HTTP + XML (as shown above; SOAP), or a database client implementation where result of a query is a set or rows.

DIFFERENCES BETWEEN C# 8.0 AND EARLIER VERSIONS

C# 8.0 and .NET Standard 2.1 introduce the native support for Async Streams. However, if you still use an older version of C# and wish to upgrade, the changes should be straight-forward.

Change an iterator from this:

using Dasync.Collections;
IAsyncEnumerable<int> AsyncIterator() => new AsyncEnumerable(async yield =>
{
  await yield.ReturnAsync(123);
});

to this:

using System.Collections.Generic;
async IAsyncEnumerable<int> AsyncIterator()
{
  yield return 123;
}

Change a consumer from this:

using Dasync.Collections;
await asyncEnumerable.ForEachAsync(item => 
{
  ...
});

to this:

using System.Collections.Generic;
await foreach (var item in asyncEnumerable)
{
  ...
}

REFERENCES

GitHub: https://github.com/Dasync/AsyncEnumerable

NuGet.org: https://www.nuget.org/packages/AsyncEnumerator/

License: https://github.com/Dasync/AsyncEnumerable/blob/master/LICENSE

IMPLEMENTATION DETAILS

1: How to use this library?

See examples above. The core code is in System.Collections.Async namespace. You can also find useful extension methods in System.Collections and System.Collections.Generic namespaces for IEnumerable and IEnumerator interfaces.

2: Using CancellationToken

    IAsyncEnumerable<int> ProduceNumbers()
    {
      return new AsyncEnumerable<int>(async yield => {

        await FooAsync(yield.CancellationToken);
      });
    }

3: Always remember about ConfigureAwait(false)

To avoid performance degradation and possible dead-locks in ASP.NET or WPF applications (or any SynchronizationContext-dependent environment), you should always put ConfigureAwait(false) in your await statements:

    IAsyncEnumerable<int> GetValues()
    {
      return new AsyncEnumerable<int>(async yield =>
      {
        await FooAsync().ConfigureAwait(false);

        // Yes, it's even needed for 'yield.ReturnAsync'
        await yield.ReturnAsync(123).ConfigureAwait(false);
      });
    }

4: Clean-up on incomplete enumeration

Imagine such situation:

    IAsyncEnumerable<int> ReadValuesFromQueue()
    {
      return new AsyncEnumerable<int>(async yield =>
      {
        using (var queueClient = CreateQueueClient())
        {
          while (true)
          {
            var message = queueClient.DequeueMessageAsync();
            if (message == null)
              break;
            
            await yield.ReturnAsync(message.Value);
          }
        }
      });
    }

    Task<int> ReadFirstValueOrDefaultAsync()
    {
      return ReadValuesFromQueue().FirstOrDefaultAsync();
    }

The FirstOrDefaultAsync method will try to read first value from the IAsyncEnumerator, and then will just dispose it. However, disposing AsyncEnumerator does not mean that the queueClient in the lambda function will be disposed automatically as well, because async methods are just state machines which need somehow to go to a particular state to do the clean-up. To provide such behavior, when you dispose an AsyncEnumerator before you reach the end of enumeration, it will tell to resume your async lambda function (at await yield.ReturnAsync()) with the AsyncEnumerationCanceledException (derives from OperationCanceledException). Having such exception in your lambda method will break normal flow of enumeration and will go to terminal state of the underlying state machine, what will do the clean-up, i.e. dispose the queueClient in this case. You don't need (and shouldn't) catch that exception type, because it's handled internally by AsyncEnumerator. The same exception is thrown when you call yield.Break().

There is another option to do the cleanup on Dispose:

    IAsyncEnumerator<int> GetQueueEnumerator()
    {
      var queueClient = CreateQueueClient();

      return new AsyncEnumerable<int>(async yield =>
      {
        while (true)
        {
          var message = queueClient.DequeueMessageAsync();
          if (message == null)
            break;
            
          await yield.ReturnAsync(message.Value);
        }
      },
      onDispose: () => queueClient.Dispose());
    }

5: Why is GetAsyncEnumeratorAsync async?

The IAsyncEnumerable.GetAsyncEnumeratorAsync() method is async and returns a Task<IAsyncEnumerator>, where the current implementation of AsyncEnumerable always runs that method synchronously and just returns an instance of AsyncEnumerator. Having interfaces allows you to do your own implementation, where classes mentioned above are just helpers. The initial idea was to be able to support database-like scenarios, where GetAsyncEnumeratorAsync() executes a query first (what internally returns a pointer), and the MoveNextAsync() enumerates through rows (by using that pointer).

6: Returning IAsyncEnumerable vs IAsyncEnumerator

When you implement a method that returns an async-enumerable collection you have a choice to return either IAsyncEnumerable or IAsyncEnumerator - the constructors of the helper classes AsyncEnumerable and AsyncEnumerator are absolutely identical. Both interfaces have same set of useful extension methods, like ForEachAsync.

When you create an 'enumerable', you create a factory that produces 'enumerators', i.e. you can enumerate through a collection many times. On the other hand, creating an 'enumerator' is needed when you can through a collection only once.

Consider these 2 scenarios:

    // You want to execute the same query against a database many times - you need an 'enumerable'
    IAsyncEnumerable<DbRow> GetItemsFromDatabase()
    {
      return new AsyncEnumerable<int>(async yield =>
      {
        using (var dbReader = DbContext.ExecuteQuery(...))
        {
          while (true)
          {
            DbRow row = dbReader.ReadAsync();
            if (row == null)
              break;
            await yield.ReturnAsync(row);
          }
        }
      });
    }

    // Assume that you cannot seek in the stream - you need an 'enumerator'
    IAsyncEnumerator<byte> EnumerateBytesInStream(Stream stream)
    {
      return new AsyncEnumerator<int>(async yield =>
      {
        while (true)
        {
          int byte = await stream.ReadByteAsync();
          if (byte < 0)
            break;
          await yield.ReturnAsync((byte)byte);
        }
      });
    }

7: Where is Reset or ResetAsync?

The Reset method must not be on the IEnumerator interface, and should be considered as deprecated. Create a new enumerator instead. This is the reason why the 'oneTimeUse' flag was removed in version 2 of this library.

8: How can I do synchronous for-each enumeration through IAsyncEnumerable?

You can use extension methods like IAsyncEnumerable.ToEnumerable() to use built-in foreach enumeration, BUT you should never do that! The general idea of this library is to avoid thread-blocking calls on worker threads, where converting an IAsyncEnumerable to IEnumerable will just defeat the whole purpose of this library. This is the reason why such synchronous extension methods are marked with [Obsolete] attribute.

9: What's the difference between ForEachAsync and ParallelForEachAsync?

The ForEachAsync allows you to go through a collection and perform an action on every single item in sequential manner. On the other hand, ParallelForEachAsync allows you to run the action on multiple items at the same time where the sequential order of completion is not guaranteed. For the latter, the degree of the parallelism is controlled by the maxDegreeOfParallelism argument, however it does not guarantee to spin up the exact amount of threads, because it depends on the thread pool size and its occupancy at a moment of time. Such parallel approach is much better than trying to create a task for an action for every single item on the collection and then awaiting on all of them with Task.WhenAll, because it adds less overhead to the runtime, better with memory usage, and helps with throttling-sensitive scenarios.

RELEASE NOTES

4.0.2: Bug-fix: Slow Take extension method. Add AllAsync and AnyAsync extension methods. Add support for SourceLink.

4.0.1: Explicitly add the DLL for .NET Framework 4.6.1 to be compatible with NET Standard 2.0. No functional changes.

4.0.0: Use interfaces from Microsoft.Bcl.AsyncInterfaces package in NET Standard 2.0. No functional changes.

3.1.0: Add support for NET Standard 2.1. Consolidate interface with Microsoft's implementation.

2.2.2: Bug-fix: IAsyncEnumerator.MoveNext must return False on Yield.Break instead of throwing OperationCanceledException.

2.2.0: New LINQ-style extension methods: SelectMany, Append, Prepend, OfType, Concat, Distinct, ToDictionaryAsync, ToLookupAsync, AggregateAsync.

2.1.1: Bug-fix: AsyncEnumerator.OnEnumerationComplete might throw a NullReferneceException when enumeration is canceled. Bug-fix: Batch extension method does not work at all - always throws InvalidOperationException.

2.1.0: New extension methods: Batch, UnionAll, Single, SingleOrDefault, DefaultIfEmpty, Cast. Bug-fix: AsyncEnumerator.MoveNextAsync() must not succeed after Dispose().

2.0.1: Bug-fix: call onDispose when AsyncEnumerator is GC'ed but enumeration hasn't been started. Bug-fix: re-throw base exception instead of AggregateException in blocking synchronous methods.

2.0.0: Revise design of the library: same features, but slight paradigm shift and interface breaking changes.

1.5.0: Add support for .NET Standard, minor improvements.

1.4.2: Add finalizer to AsyncEnumerator and call Dispose in ForEachAsync and ParallelForEachAsync extension methods.

1.4.0: Add new generic type AsyncEnumeratorWithState for performance optimization. Now IAsyncEnumerator<T> is covariant. Add ForEachAsync, ParallelForeachAsync, and LINQ-style extension methods for IAsyncEnumerator.

1.3.0: Significantly improve performance of AsyncEnumerator by reducing thread switching and re-using instances of TaskCompletionSource. Add support for a state object that can be passed into AsyncEnumerable and AsyncEnumerator for performance optimization. Remove CancellationToken from Select/Take/Skip/Where extension methods - fix improper implementation. Move AsyncEnumerationCanceledException out of the generic AsyncEnumerator type. Change default behavior of the ToAsyncEnumerable extension method - now MoveNextAsync will run synchronously by default.

1.2.3: AsyncEnumerationCanceledException is thrown to the async enumeration function when the AsyncEnumerator is disposed before reaching the end of enumeration, what allows to do the clean-up. Fixed MoveNextAsync() that threw an exception sometimes only when you passed the end of enumeration.

1.2.2: Fix exception propagation in AsyncEnumerator.

1.2.1: New Linq-style extension methods in System.Collections.Async namespace.

1.2.0: Contract breaking changes in ParallelForEachAsync: introduce ParallelForEachException to unify error outcome of the loop.

1.1.0: Add ParallelForEachAsync extension methods for IEnumerable<T> and IAsyncEnumerable<T> in System.Collections.Async namespace.

asyncenumerable's People

Contributors

andres-gimenez avatar danylofitel avatar domdom avatar frjol avatar kind-serge avatar mausch avatar mikebeaton avatar ppittle avatar tyrotoxin avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

asyncenumerable's Issues

AsyncEnumerable interface can return completed Task

return await GetAsyncEnumeratorAsync(cancellationToken);

Minor difference - but possibly relevant as I try to wrap the IAsyncEnumerator<T> to the ones exposed in System.Interactive. The line above is async and awaits the Task rather than just returning the Task returned to it from the IAsyncEnumerator<T> interface implementation. Instead, we can remove both async and await from this method, and allow it to return the Task.FromResult task (which is a minor performance gain also).

There is a minor minor functional difference, in that async wraps up in a continuation, whereas returning a Task directly does not. This means that exceptions thrown when doing

System.Threading.Tasks.Task Demo(){
	throw new Exception();
	return SomeOtherTask();
}

will be thrown immediately (at a var demoTask = Demo() rather than at the await demoTask). However...

async System.Threading.Tasks.Task Demo2() {
	throw new Exception();
	await SomeOtherTask();
}

Demo2 would execute var demoTask = Demo2() fine, but throw at await demoTask.

All that said, this code can only throw if AsyncEnumerable.ctor is passed a null argument, and the change in exception behavior would just make IAsyncEnumerable.GetAsyncEnumeratorAsync match the behavior of IAsyncEnumerable<T>.GetAsyncEnumeratorAsync, so it's probably not a breaking change, practically speaking.

Parallel Loop State (feature request)

Current Parallel.ForEach passes a loop state to any method calls. This can be valuable as any break may necessitate immediate stoppage of the loop, which can be done by calling loopstate.Stop().

AggregateException throw during synchronous enumeration

In 1.4.2, this would throw an Exception:

new AsyncEnumerable<int>(async yield => { throw new Exception(); }).ToList();

While in 2.0.0, this throws an AggregateException:

new AsyncEnumerable<int>(async yield => { throw new Exception(); }).ToEnumerable().ToList();

I believe it is because calls to Task.ConfigureAwait(false).GetAwaiter().GetResult() were replaced with .Result. It would be better to replace it with Task.GetAwaiter().GetResult(), because then it would not wrap exceptions in AggregateExceptions

Creating an IAsyncEnumerable from a Task (feature request)

It might be useful to add an IAsyncEnumerable constructor that takes a Task<IEnumerable<T>>, and possibly Task<IAsyncEnumerable<T>>. That would make it so that you don't have to mix async and IAsyncEnumerable if you have async code that gets your IEnumerable.

A (currently-untested) example of what I mean would be something like:

private IAsyncEnumerable<T> CreateAsyncEnumerableFromTask<T>(Func<Task<IEnumerable<T>>> enumerableFunction)
{
	return new AsyncEnumerable<T>(async yield =>
	{
		IEnumerable<T> items = await enumerableFunction();

		foreach (T item in items)
		{
			await yield.ReturnAsync(item);
		}
	});
}

That could probably be implemented much better if it was included in the library itself, maybe as an AsyncEnumerable<T> constructor, or an extension method on Task<IEnumerable<T>>.

An example of how it could be used (really, a distilled version of how I'm going to use the above method) would be:

return CreateAsyncEnumerableFromTask(async delegate
{
	string response = await RunRequest(requestParameters);
	return Deserialize(response);
});

Dispose never called during partial enumeration

Consider the following:

void Main()
{
	GetEnumerable().TakeAsync(2).ForEachAsync(i => i.Dump());
}

public IAsyncEnumerable<int> GetEnumerable()
{
	return new AsyncEnumerable<int>(async yield =>
	{
		using (new MyDisposable())
		{
			await yield.ReturnAsync(1);
			await yield.ReturnAsync(2);
			await yield.ReturnAsync(3);
		}
	});
}

public class MyDisposable : IDisposable
{
	public MyDisposable()
	{
		"Created".Dump();
	}
	
	public void Dispose()
	{
		"Disposed".Dump();
	}
}

The output of this is:

Created
1
2

Compare this to the non-async equivalent:

void Main()
{
	foreach(var i in GetEnumerable().Take(2))
		i.Dump();
}

public IEnumerable<int> GetEnumerable()
{
	using (new MyDisposable())
	{
		yield return 1;
		yield return 2;
		yield return 3;
	}
}

public class MyDisposable : IDisposable
{
	public MyDisposable()
	{
		"Created".Dump();
	}

	public void Dispose()
	{
		"Disposed".Dump();
	}
}

In this case, the output is

Created
1
2
Disposed

So in the async version, dispose is not guaranteed to be called, while it is in the sync version.

Source coming from multiple threads?

Is the sourcing of the stream thread safe? I.e. would the below situation cause any issues I'm unaware of. I have a situation where I need to make a large amount of async requests that generate the async stream, and want to do the sourcing requests in parallel (unordered), and then push into the async stream. Will that create any negative side effects I'm not seeing?

[Fact]
        public void TestAsyncStreams()
        {
            IList<string> urls = new List<string>() {
                "http://www.google.com",
                "http://www.microsoft.com",
                "http://www.github.com"
            };


            var result = new ConcurrentBag<string>();

            var asyncEnumerableCollection = ProduceWebPages(urls, CancellationToken.None);
            asyncEnumerableCollection.ForEachAsync(async page => {
                result.Add(page);
            }).Wait();

            Assert.Equal(urls.Count, result.Count);
        }

        static IAsyncEnumerable<string> ProduceWebPages(IEnumerable<string> urls, CancellationToken cancellationToken)
        {
            return new AsyncEnumerable<string>(async yield => {

                await urls.ParallelForEachAsync(
                    async uri =>
                    {
                        using (var client = new HttpClient())
                        {
                            var str = await client.GetStringAsync(uri);
                            await yield.ReturnAsync(str);
                        }
                    },
                    maxDegreeOfParalellism: 5,
                    cancellationToken: cancellationToken);

                Console.WriteLine("Done");
            });
        }

Using ParallelForEachAsync() for non-CPU-intensive tasks

I would like to use a parallel loop for N number of calls to a service, to execute them in batches, and avoid DoS-ing the service.
I had previously created my own version of the loop, using Task.WhenAny() and Task.WhenAll(), but then I needed that in multiple places and thought I should find myself a library for it (unfortunately, I don't use .NET 6 yet, so I don't have its features).

Looking at your ParallelForEachAsync method:
https://github.com/Dasync/AsyncEnumerable/blob/master/src/Extensions/ParallelForEachExtensions.cs#L279
I see that it uses Task.Run() which would be using a new thread.
But I guess that this could be avoided in my case, couldn't it?
So, in other words, your library is not optimized for my use-case, correct?
Or perhaps this code is only used to prevent deadlocks?
Let me know if I am missing anything. :)

I wonder if the Parallel.ForEachAsync() method in .NET 6 works the same way as your library:
https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreachasync?view=net-6.0

Task-Like AsyncEnumerable

Hi there, I just came across your library yesterday, and it seems great! I thought you might be interested in an experiment I put together recently to create a task-like AsyncEnumerable using C# 7's new features. I believe there could be some real synergy in our approaches. Please reach out if interested, as I would be very keen to discuss async sequences!

.NET Core 3 / .NET Standard 2.1

Hi,

I see that the most recent version of the library has been adapted to use the built in IAsyncEnumerable in .NET Core 3, but can I ask what the situation is with .NET Standard 2.1? (from a quick test, looks like it uses its own definition there rather than the built in version?).

Thanks.

Race condition in ParallelForEachAsync when exceptions are thrown

ParallelForEachAsync sometimes does not throw an exception if individual iterations did throw exceptions.

It reproduces with the following code (derived from the scenario where I noticed it):

while (true)
{
    Console.WriteLine();
    IReadOnlyList<int> input = Enumerable.Range(0, 10).ToList();
    ConcurrentQueue<int> output = new ConcurrentQueue<int>();

    try
    {
        await input.ParallelForEachAsync(
            async item =>
            {
                if (item == 0)
                {
                    throw new AggregateException(new Exception("Individual task failed."));
                }

                await Task.Delay(1);
                output.Enqueue(item);
            },
            maxDegreeOfParallelism: 10,
            cancellationToken: default);
    }
    catch (Exception)
    {
        continue;
    }

    Console.WriteLine($"No exception. {input.Count} - {output.count}.");
}

The cancellation token does not get canceled, and the default values of breakLoopOnException: false and gracefulBreak: true are used. The first task always throws an exception, therefore the expectation is that remaining tasks would finish, and ParallelForEachAsync would throw an exception.
However, the code above will eventually reach the case where exception is not thrown, and only 9 items are added to the queue (Console.WriteLine() statement above).

In the ParallelForEachAsync implementation the main task that schedules individual iterations, as well as continuations of individual tasks all call OnOperationComplete() of ParallelForEachContext. OnOperationComplete() adds exception to the list of tracked exceptions if it was supplied, releases the semaphore and at the very end calls CompleteLoopNow() if all tasks have completed or cancellation was requested (in this specific case I was not cancelling any tasks, so it was only called when all tasks finish).

public void OnOperationComplete(Exception exceptionIfFailed = null)
{
    // Add exception to the list
    // Release the semaphore

    if ((_semaphore.CurrentCount == _maxDegreeOfParallelism + 1) || (IsLoopBreakRequested && !_gracefulBreak))
        CompleteLoopNow();
}

The problem occurs when the last few tasks release the semaphore at the same time, in which case
_semaphore.CurrentCount == _maxDegreeOfParallelism + 1 condition can be evaluated as true for multiple tasks, so CompleteLoopNow() can be called more than once.

public void CompleteLoopNow()
{
    Console.WriteLine("CompleteLoopNow - Start");
    _cancellationTokenRegistration.Dispose();

    try
    {
        if (_semaphore != null)
            _semaphore.Dispose();
    }
    catch
    {
    }

    var exceptions = ReadExceptions();
    var aggregatedException = exceptions?.Count > 0 ? new ParallelForEachException(exceptions) : null;

    if (_cancellationToken.IsCancellationRequested)
    {
        Console.WriteLine("CompleteLoopNow - OperationCanceledException");
        _ = _completionTcs.TrySetException(
            new OperationCanceledException(
                new OperationCanceledException().Message,
                aggregatedException,
                _cancellationToken));
    }
    else if (exceptions?.Count > 0)
    {
        Console.WriteLine("CompleteLoopNow - TrySetException");
        _ = _completionTcs.TrySetException(aggregatedException);
    }
    else
    {
        Console.WriteLine("CompleteLoopNow - TrySetResult");
        _ = _completionTcs.TrySetResult(null);
    }
}

Which means that multiple tasks can also enter ReadExceptions() concurrently.

public List<Exception> ReadExceptions()
{
    Console.WriteLine("ReadExceptions - Start");
    bool lockTaken = false;
    while (!lockTaken)
        _exceptionListLock.Enter(ref lockTaken);
    try
    {
        Console.WriteLine("ReadExceptions - Returning");
        return _exceptionList;
    }
    finally
    {
        _exceptionList = null;
        _exceptionListLock.Exit(useMemoryBarrier: false);
        Console.WriteLine("ReadExceptions - End");
    }
}

However, in the finally block the exception list is set to null, so the first task calling it will get the full list of exceptions back, and subsequent tasks will get a null. Then in CompleteLoopNow() it is possible that a task with null exception list calls TrySetResult() before the a task with the correct exception list calls TrySetException().

I debugged with the same Console.WriteLine statements as above, and in cases where ParallelForEachAsync() did not throw I saw the following output

CompleteLoopNow - Start              // Task A entering CompleteLoopNow()
ReadExceptions - Start               // Task A entering ReadExceptions()
ReadExceptions - Returning           // Task A returning a full list of exceptions
ReadExceptions - End                 // Task A setting the list of exceptions to null
CompleteLoopNow - Start              // Task B entering CompleteLoopNow()
ReadExceptions - Start               // Task B entering ReadExceptions()
ReadExceptions - Returning           // Task B returning null as the list of exceptions
ReadExceptions - End                 // Task B setting the list of exceptions to null
CompleteLoopNow - TrySetResult       // Task B setting result on the task completion source since it got null from ReadExceptions()
CompleteLoopNow - TrySetException    // Task A setting exception on the task completion source since it got a non-empty list of exceptions from ReadExceptions()

I'm not sure whether ReadExceptions() needs to reset exception list to null. One possible reason is to prevent a race condition for the case where the loop was canceled, in which case continuations of tasks that are still running can keep adding exceptions to the list, but the same list is returned from ReadExceptions() to CompleteLoopNow(). However, in this case it's possible to return a copy of the exception list from ReadExceptions(), i.e.

public List<Exception> ReadExceptions()
{
    bool lockTaken = false;
    while (!lockTaken)
        _exceptionListLock.Enter(ref lockTaken);
    try
    {
        // Return a copy, so the list being returned will not be modified
        // by tasks that are still running if the loop was canceled
        return new List<Exception>(_exceptionList ?? Enumerable.Empty<Exception>());
    }
    finally
    {
        _exceptionListLock.Exit(useMemoryBarrier: false);
    }
}

Another option is to prevent tasks from re-entering CompleteLoopNow().

Why does IEnumerable<Task<T>>.ToAsyncEnumerable() not return IAsyncEnumerable<T> ?

I think it is expected that ToAsyncEnumerable() will return IAsyncEnumerable<T>. But it returns IEnumerable<Task<T>>.

    // Examples
    GetDataTasks().ToAsyncEnumerable().ForEachAsync( data => { // expected and desired behavior
            Console.WriteLine( data );
    } );

    GetDataTasks().ToAsyncEnumerable().ForEachAsync( async i => { // actual behavior
            Console.WriteLine( await i );
    } );

    // Requests
    private static IEnumerable<Task<int>> GetDataTasks(int start = 0, int count = 1000) {
        return Enumerable.Range( start, count ).Select( GetDataAsync );
    }

    private static async Task<int> GetDataAsync(int id) { // simulate request to web service
        await Task.Delay( rnd.Next( 100, 2000 ) );
        return id;
    }

Clarify differences between ForEachAsyncExtensions and ParallelForEachExtensions

The thread https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach/11565531#11565531 has an answer suggested to use Task.WhenAll to run multiple (degreeOfParallel) asynchronous tasks in parallel, not waiting until previous task is completed.

public static Task ForEachAsync<T>(
      this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current).ContinueWith(t => 
                          {
                              //observe exceptions
                          });
})); 
}

Your library has a few ForEachAsyncExtensions and ParallelForEachExtensions, that seems to do similar things.
It will be good if you add clarification in readme.md or wiki what is the difference between ForEachAsyncExtensions and ParallelForEachExtensions. It also will be good to clarify how ParallelForEachExtensions are different from MS Parallel.ForEach.

Also I wish to know your opinion, is mentioned above ForEachAsync good enough, or there are some limitations of this approach that some methods of your library addresses

Unexpected behaviour after disposing

Consider the following example

var enumerable = new AsyncEnumerable<int>(async yield =>
    {
        await yield.ReturnAsync(1);
        await yield.ReturnAsync(2);
        await yield.ReturnAsync(3);
    });
var enumerator = await enumerable.GetAsyncEnumeratorAsync();
await enumerator.MoveNextAsync();
enumerator.Dispose();
await enumerator.MoveNextAsync(); // What is the expected behavior here?

If you compare this to an IEnumerable that is disposable, such as the following:

IEnumerable<int> GetItems()
{
	using (var disposable = new Disposable())
	{
		yield return 1;
		yield return 2;
		yield return 3;
	}
}

This has the following behavior:

IEnumerable<int> enumerable = GetItems();
var enumerator = enumerable.GetEnumerator();
enumerator.MoveNext(); // true;
enumerator.Current; // 1
enumerator.Dispose();
enumerator.MoveNext(); // false;
enumerator.Current; // 1

So disposing the enumerator acts as if you had reached the end of the enumeration. Whereas if you do the same thing on an IAsyncEnumerator, it will actually restart the enumeration from the start, because the implementation of MoveNextAsync will lazily start enumeration if _yield and _enumerationTask are null, and they are both set to null during Dispose.

SelectMany (feature request)

I'm currently working on some code that runs batches of requests, and returns an IAsyncEnumerable. It would be helpful to have a SelectMany extension method, so that I don't have to flatten batches manually.

proposal: throw exception if Yield.ReturnAsync called before it's ready

A user of your library posted this question on stack overflow.

Basically, they're using AsyncEnumerable<T>.ForEachAsync, but rather than producing items serially, they tried to produce concurrently by creating multiple concurrent generator tasks within the delegate that's passed into the AsyncEnumerable constructor. In other words, they were trying to use this library for the multi producer, single consumer pattern.

The problem is that the incorrect usage did not result in an error, but instead their program hung, and they didn't understand why. I believe it's related to calling Yield.ReturnAsync multiple times, before the previous task returned by ReturnAsync is in a completed state, causing the last call to overwrite information from the previous calls.

I believe if Yield.ReturnAsync throws an InvalidOperationException when it detects a call when the previous call is not completed, then it would allow your users to more quickly detect problems in their own code.

Memory leak

Some "OnEnumerationComplete" tasks are not cleaned up when AsyncEnumerable is not Enumerated to the end.

I have run 4 iterations of code below and let it run for another 100 seconds:
aeleak

Each iteration will add one "Sheduled" task + 20 objects that are stuck on heap.
aemem

I have found that the AsyncEnumeratorWithState.OnEnumerationComplete(...) throws silent NullReferenceException on "_yield", but where the problem starts is beyond me.

else if (task.IsCanceled)
{
      enumerator._yield.SetCanceled();
}
Example code (.NET 4.7 console application with AsyncEnumerable 2.1.0.0 from nuget):

static void Main(string[] args)
    {
        //initialize heap ... 
        Enumerate().Wait();//this will not create "Scheduled" Task
        while (true)
        {
            GC.Collect();
            Console.ReadLine();
            Evilize().Wait();//this will add one "Scheduled" task that does not run
            GC.Collect();
            //Take memory snapshot
        }
    }

    public static async Task Enumerate()
    {
        var allEvil = await Evil().GetAsyncEnumeratorAsync();
        while (await allEvil.MoveNextAsync()) { } //enumerating to end does not leak tasks
    }

    public static async Task Evilize()
    {
        var allEvil = await Evil().GetAsyncEnumeratorAsync();
        var oneEvil = allEvil.FirstAsync();
    }

    public static IAsyncEnumerable<string> Evil()
    {
        var input = new AsyncEnumerable<string>(async yield =>
        {
            await Task.Run(async () => await Task.Delay(1));
            await yield.ReturnAsync("Evil");
        });
        return new AsyncEnumerable<string>(async yield =>
        {
            var source = await input.GetAsyncEnumeratorAsync(yield.CancellationToken);
            while (await source.MoveNextAsync(yield.CancellationToken))
            {
                await yield.ReturnAsync("more" + source.Current);
            }
        });
    }

Enumerate an AsyncEnumerable with some degree of parallelism

I first want to mention that you library is great. I think something like this should have been part of the framework, because it makes perfect sense with the shift to async/await.

As an enumerable things are working great. The only thing I have noticed is that enumerating the entire sequence can take a long time because of one async call after another.

Would it be possible to prefetch a call for a certain amount in order to save time. I am working on something, but wondered if you had any ideas.

I would imagine I could execute the calls in batches however that would seem to negate the advantages of an async enumerable.

Aggregate with async accumulator

I have this variant of Aggregate defined in one of my projects, using an async accumulator:

        /// <summary>
        /// Applies an accumulator function over a sequence. The specified seed value is used as the initial accumulator value.
        /// </summary>
        /// <param name="source"></param>
        /// <param name="seed"></param>
        /// <param name="func"></param>
        /// <typeparam name="TSource"></typeparam>
        /// <typeparam name="TAccumulate"></typeparam>
        /// <returns></returns>
        public static async Task<TAccumulate> Aggregate<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, Task<TAccumulate>> func)
        {
            TAccumulate state = seed;
            await source.ForEachAsync(async e =>
            {
                state = await func(state, e);
            });
            return state;
        }

I noticed that the current Aggregate methods here don't have this overload, do you think it's worth adding it? If so, its implementation should probably be optimised etc.

Take slow for large count value regardless of enumerable size

Take() extension method is really slow when specifying large values of count (a billion or higher), even on empty enumerables. For example, the below code takes over 10 seconds to run:

List<int> result = await Enumerable.Empty<int>().ToAsyncEnumerable().Take(1000000000).ToListAsync();

Looking at source code (IAsyncEnumerableExtensions.cs), it seems that it's doing exactly count iterations without breaking from the loop as soon as HasNext returns false:

            private static async Task _enumerate(AsyncEnumerator<TSource>.Yield yield, TakeContext<TSource> context)
            {
                var enumerator = context.Source.GetAsyncEnumerator(yield.CancellationToken);
                try
                {
                    for (var i = context.Count; i > 0; i--)
                    {
                        if (await enumerator.MoveNextAsync().ConfigureAwait(false))
                            await yield.ReturnAsync(enumerator.Current).ConfigureAwait(false);
                    }
                }
                finally
                {
                    await enumerator.DisposeAsync().ConfigureAwait(false);
                }
            }

Either breaking the loop in else clause, or checking the condition in the loop control should allow for early exit if there's no more items left.

Pull request #50

BTW, thanks for the great package!

IAsyncEnumerable should not implement IEnumerable

I have a suggestion about the API based on an issue I have encountered using this library.

In reference to the following two interfaces:
public interface IAsyncEnumerable : IEnumerable
public interface IAsyncEnumerable<T> : IEnumerable<T>, IAsyncEnumerable

Currently they implement IEnumerable and IEnumerable<T>. I don't think they should.

With the current implementation, it is easy to pass an IAsyncEnumerable around as an IEnumerable. If you are in an asp.net request and are therefore using the asp.net synchronization context, you could create a deadlock if you use an AsyncEnumerable as an IEnumerable.

There are two obvious solutions to this problem:

  1. Ensure that you use ConfigureAwait(false) everywhere (and hope that you don't need HttpContext).
  2. Do not use the AsyncEnumerable as an IEnumerable (in order to go 'async all the way').

Both of these approaches are error prone, as there is no guarantee you have done either of them correctly, nor is there a good way of tracking down all of the appropriate places that need to be checked.

I suggest changing the IAsyncEnumerable interfaces such that they do not implement IEnumerable to avoid such issues.

I can see that it is very useful to be able to use an IAsyncEnumerable as an Enumerable in many cases, so I can see why it was done this way. But perhaps it would be better for this to be an extension method to transform an IAsyncEnumerable into an IEnumerable?

eg:
public static IEnumerable<T> ToEnumerable(this IAsyncEnumerable<T> asyncEnumerable)

This has two advantages:

  1. It makes it explicit that you want to enumerate synchronously
  2. It makes it easy to track down places that you are enumerating synchronously.

Point 2 here is particularly useful for the deadlock scenario: If you are find a usage of this extension method within an asp.net request, then you know there is a deadlock risk and you may want to instead enumerate asynchronously, and propagate the async all the way to your entry point.

What does 'unofficial predecessor version' mean

I presume this is because C#8 is planning to incorporate Async Streams. What do you see as the future of this library? Are the .NET team looking at this library for examples (e.g. authoring an enumerable is much easier here than in the System.Interactive namespace), do you expect this library to live alongside the BCL libraries, or do you expect them to supersede this and it to sunset when the Async Streams are officially released?

Thanks Serge!

Add ForEachAsync break equivalent (Feature Request)

Hi,

I am finding this library extremely useful!

I have found one part of the yield return / foreach pattern which I believe isn't catered for yet.

If you are using foreach (var item of items) then you can put break; inside the loop to end early.

A pattern which I've made up for doing the equivalent is:

try
{
	await items.ForEachAsync(async item =>
	{
		// ... do stuff ...
		if (abort) throw new AsyncEnumerationCanceledException();
	});
}
catch (AsyncEnumerationCanceledException) { }

I wonder if it would be possible to support something like this instead:

await items.ForEachAsync(async item =>
{
	// ... do stuff ...
	if (abort) items.Break();
});

I'm imagining that my proposed IAsyncEnumerable<T>.Break() method would do something like throw a new ForEachAsyncCanceledException which (as well as sending an AsyncEnumerationCanceledException into the enumeration function to allow cleanup, as all exceptions in the action function already do) would be seen and understood by ForEachAsync itself, meaning the outer catch in my manually coded version would no longer be needed.

Does this make sense? Am I missing something which does this already?

Question on requirements for build server and ncrunch

I have used this successfully in .net core web api. works like a charm.
I am currently updating some older .net framework 4.7 WCF, converting the class libraries to .net standard 2.0

Everything seems to work in Visual studio 2019.
The issue comes in with my build server and ncrunch. I discovered that I need to include Mircrosoft.Bcl.AsncInterfaces.
But it is still failing for the following error and if there is something else I need to include.

Type: Failure
Message: Dasync.Collections.ParallelForEachException: One or more errors occurred.
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
at Bci.Provider.Wcf.Dal.Updates.ProcessProvider.d__2.MoveNext() in D:\Go\A\pipelines\BCI.Provider\src\Wcf\Bci.Provider.Wcf.Dal\Updates\ProcessProvider.cs:line 50
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter1.GetResult() at ProviderSrvcTests.PutProvider_UpdateTypeServiceCallTest.<SoapCallForUpdateReturnsValidResponseTest>d__1.MoveNext() in D:\Go\A\pipelines\BCI.Provider\src\UnitTests\ProviderSrvcTests\PutProvider_UpdateTypeServiceCallTest.cs:line 40 at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Xunit.Sdk.TestInvoker1.<>c__DisplayClass48_1.<b__1>d.MoveNext() in C:\Dev\xunit\xunit\src\xunit.execution\Sdk\Frameworks\Runners\TestInvoker.cs:line 264
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Xunit.Sdk.ExecutionTimer.d__4.MoveNext() in C:\Dev\xunit\xunit\src\xunit.execution\Sdk\Frameworks\ExecutionTimer.cs:line 48
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Xunit.Sdk.ExceptionAggregator.d__9.MoveNext() in C:\Dev\xunit\xunit\src\xunit.core\Sdk\ExceptionAggregator.cs:line 90
at System.Threading.Tasks.ValueTask.get_IsCompleted()
at Dasync.Collections.ParallelForEachExtensions.<>c__DisplayClass1_0`1.<b__0>d.MoveNext()

Change maxdegreeofparallelism during execution?

I'm guessing this isn't possible, but by the slight chance it is, please let me know how to do it! I need to change the number of threads whilst the parallel loop is running.

My other option is using cancellation tokens, but they seem to be very slow. Is there not a fast way to just completely exit the parallelforeachasync? Might have to result to a application restart, but was wondering if someone could help me this.

Performance compared to using Task.WhenAll

Is there any difference in performance between the ParallelForEachAsync extension method and the code snippet below

public static Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> funcBody, int maxDoP = 4)

        {
            async Task AwaitPartition(IEnumerator<T> partition)
            {
                using (partition)
                {
                    while (partition.MoveNext())
                    { await funcBody(partition.Current); }
                }
            }

            return Task.WhenAll(
                Partitioner
                    .Create(source)
                    .GetPartitions(maxDoP)
                    .AsParallel()
                    .Select(p => AwaitPartition(p)));
        }

Chaining async's

Sorry if this is a question that's already been answered, but I'm trying to figure out if this library is applicable to a big of fairly simple code. Today I have (converted to a more verbose form for clarity)

IEnumerable<MyEntity> entities = await GetEntitiesAsync();
IEnumerable<Task<MyProcessedEntity>> tasks = entities.Select(ProcessEntityAsync);
IEnumerable<MyProcessedEntity> processedEntities = await Task.WhenAll(tasks); // I realize this could introduce throttling issues
var tasksToGetNames = processedEntities.Select(async e => new { Name = await GetNameAsync(e); Entity = e });
var withNames = await Task.WhenAll(tasksToGetNames);
var byName = withNames.ToDictionary(i => i.Name);
return byName

public static async Task<MyProcessedEntity> ProcessEntityAsync(MyEntity e) ....

public static async Task<string> GetNameAsync(MyProcessedEntity e) ....

Obviously this leaves a lot to be desired in terms of verbosity, especially since this kind of code is repeated dozens of times throughout my codebase.

Ideally, I'm looking for the means to write something like this:


Dictionary<string, MyProcessedEntity> byName = 
    await GetEntitiesAsync().SelectAsync(ProcessEntityAsync)
        .ToDictionaryAsync(e => GetNameAsync(e))

I can easily write SelectAsync and ToDictionaryAsync myself as extension methods, but trying to understand if this library provides that kind of functionality already.

Property-based testing

Have you considered using FsCheck or Hedgehog for testing this project? Property-based testing is ideal for finding bugs in this kind of projects.

yield.Break() leads to TaskCanceledException

Look at next code:

    var e = GetItems().GetAsyncEnumeratorAsync().Result;
    e.MoveNextAsync().Wait();
    e.MoveNextAsync().Wait(); // this line throws: TaskCanceledException: A task was canceled.


    private static IAsyncEnumerable<int> GetItems() {
        return new AsyncEnumerable<int>( GetItems );
    }

    private static async Task GetItems(AsyncEnumerator<int>.Yield yield) {
        await Task.Delay( 1 );
        await yield.ReturnAsync( 0 );

        yield.Break(); // break

        await Task.Delay( 1 );
        await yield.ReturnAsync( 1 );
    }

IEnumerable.MoveNext() returns just false when enumerator is ended with yeild. So I decided it's bug or strange behavior.

Use the interfaces in a .NET Core 3.1 project

So, I was looking for an IAsyncEnumerable alternative, that does work under .NET Standard 1.4 and found this library.

However, when I reference my consuming .NET Core 3.1 test, it gives me

Error CS7069 Reference to type 'IAsyncEnumerator<>' claims it is defined in 'AsyncEnumerable', but it could not be found.

or when I use IAsyncEnumerable

Error CS7069 Reference to type 'IAsyncEnumerator<>' claims it is defined in 'AsyncEnumerable', but it could not be found.

The error makes sense to me, because .NET Core 3.1 does support async streaming by default. Is there a way to use it in a .NET Core 3.1 test project anyways? (I would like to not change anything in my consuming project for this)

Enumerators are not automatically Enumerated during WebAPI serialization.

Consider the following WebAPI code:

// GET api/values
public IEnumerable<string> Get()
{
    for (int i = 0; i < 5; i++)
    {
        yield return "Value" + i;
    }
}

When the URL is called from a browser you get:

[
  "Value0",
  "Value1",
  "Value2",
  "Value3",
  "Value4"
]

Using the equivalent IAsyncEnumerable code does not work the same:

// GET api/values
public IAsyncEnumerable<string> Get()
{
    return new AsyncEnumerable<string>
    (
        async yield =>
        {
            for (int i = 0; i < 5; i++)
            {
                await yield.ReturnAsync("Value" + i);
            }
        }
    );
}

When the URL is called from a browser you get:

{}

How to break on ParallelForEachAsync

I want to control how the result returned false will exit the ParallelForEachAsync loop

                await numbers.ParallelForEachAsync(async number =>
                             {
                                 var isSuccess = await DoSomethingAsync(number).ConfigureAwait(false);

                                 switch (isSuccess)
                                 {
                                     case true:
                                         DoMoreSomething();

                                         break;

                                     case false:
                                         // How to exit ParallelForEachAsync
                                         break;
                                 }
                             }, 0, cancellationToken)
                             .ConfigureAwait(false);```

the type IAsyncEnumerable exists in both AsyncInterfaces and interactive.async

i am working on a net core 2.1 application, download this package and an error started to popup,
the error says "The type 'IAsyncEnumerable' exists in both 'Microsoft.Bcl.AsyncInterfaces and 'System.Interactive.Async"

and the Microsoft.Bcl.AsyncInterfaces was introduced with this package

how do i fix this issue?

Provide example for reading file async

Dear Dasync,

Would you mind providing example for reading a big file (for example, excel file 200MB from hard driver in pc), returning a stream ?

My idea is to divide the file into each batch, so that we can read async multiple.

I see your documentation but don't know how to apply.

Thank you.

Best Regards,

Accidentally using Dasync's async methods on an EF DbSet blows up the world

using Dasync.Collections;

var booking = _dbContext.Bookings.SingleAsync(b => b.Id == id, cancellationToken);

This will result in the booking sometimes being grabbed, but in most cases it seems like much more happens than you want, e.g. it crawls all the relations, consumes a tonne of memory and eventually crashes your app.

I'm not sure how we can really solve this other than an analyzer that detects usage on a DbSet<T> and says, "No, don't do this".

cc: @anthony-keller

Using of GetAwaiter().GetResult() from ValueTask<T> in EnumeratorAdapter

Hi there!

For some reason I used ToEnumerable() extension to iterate IAsyncEnumerable synchronously.
And for sure I got the exception from this line of code.

Documentation tells us that this is not valid call for ValueTask: "The following operations should never be performed on a ValueTask<TResult> instance: ... Using .Result or .GetAwaiter().GetResult() when the operation hasn't yet completed, or using them multiple times".

I know, that I shouldn't use this extension, but I think this still should work correctly.

Also found some more usages of GetAwaiter().GetResult() from ValueTask in your code:

  • This Dispose code in AsyncEnumerator, which can be a bigger problem than I found.
  • This extension method AsyncEnumerableAdapterExtensions.MoveNext leaved for backward compatibility.

asyncEnumerable.ParallelForEachAsync() natively in C# 8

Hi,

In your README.md you guys give an example on how to go from asyncEnumerable.ForEachAsync() to native C#8. I was wondering if is possible to get the behavior of ParallelForEachAsync() natively in C#8 as well.

Thank you

ConfigureAwait(false) in asp.net _core_ not needed

Great library by the way! Super helpful! Thank you so much :)

Just a small thing BTW, Step 3 of the implementation documentation says:

To avoid performance degradation and possible dead-locks in ASP.NET or WPF applications (or any SynchronizationContext-dependent environment), you should always put ConfigureAwait(false) in your await statements

Apaprently in asp.net core this is no longer needed:

https://blog.stephencleary.com/2017/03/aspnetcore-synchronization-context.html

It won't do any harm, but it might be useful to note that this is only necessary in aspnet framework or wpf apps.

ParallelForEachAsync not awaiting

Hi shouldnt you define ParallelForEachAsync as async with await on Task.Run. By not doing this when i await ParallelForEachAsync it acutally continues to the next code block as the internal Task.Run is not awaited

async public static Task ParallelForEachAsync<T>( this IAsyncEnumerable<T> collection, Func<T, long, Task> asyncItemAction, int maxDegreeOfParalellism, bool breakLoopOnException, bool gracefulBreak, CancellationToken cancellationToken = default) { .... await Task.Run .... await context.CompletionTask; }

Async disposal (Feature Request)

While creation of the enumerator (GetAsyncEnumeratorAsync) and moving next (MoveNextAsync) are asynchronous, disposal of the enumerator is currently synchronous and cannot be done in an asynchronous way.

Given that the you are likely to be doing asynchronous operations if you are using this library in the first place, it is likely that you could have some asynchronous cleanup after you finish enumerating. I have certainly encountered this in my usage of this library.

Consider this example:

public static IAsyncEnumerable<int> GetRows()
{
    return new AsyncEnumerable<int>(async yield =>
    {
        IConnection connection = CreateConnection();
        try
        {
            var rows = await connection.GetDataAsync();
            foreach (var row in rows)
                await yield.ReturnAsync(row);
        }
        finally
        {
            await connection.Close();
        }
    });
}

interface IConnection
{
    Task Close();
    Task<IEnumerable<int>> GetDataAsync();
}

Calling Dispose() on an enumerator in this example will set the exception on the TaskCompletionSource that is being awaited on by yield.ReturnAsync. So the dispose action of closing the connection will indeed be run, but this will happen sometime after Dispose() has been called, and nothing will be awaiting on the result of this. This may be problematic in cases where you are relying on the underlying resources being disposed when you have finished with the enumerator.

So what do you think about adding some way of awaiting the result of asynchronously disposing the enumerator?

I suggest that IAsyncEnumerator continues to implement IDisposable, but add another property to IAsyncEnumerator like the following:

public interface IAsyncENumerator : IDisposable
{
    object Current { get; }
    Task<bool> MoveNextAsync(CancellationToken cancellationToken = default(CancellationToken));
    Task Disposal { get; }
}

This allows you to await on the disposal if you wish, but if you don't care about when the disposal happens, you can chose to not await anything (i.e. the current behavior). This approach is described in this blog post https://blog.stephencleary.com/2013/03/async-oop-6-disposal.html

Any extension methods on IAsyncEnumerable should probably be awaiting the disposal of the underlying enumerator as well.

If you think this is a good idea, I would be happy to work on this.

Should MoveNextAsync/ReturnAsync be checking if the CancellationToken has been cancelled?

I came across this behavior while writing some unit tests around some code that deals with batched data and I found it surprising. Just wondering what your take on it is?

Linqpad sample

async Task Main()
{
  using (var cts = new CancellationTokenSource())
    await PartialEnumerationCancelTest(cts)
        .ForEachAsync(x => x.Dump(), cts.Token);
}

IAsyncEnumerable<int> PartialEnumerationCancelTest(CancellationTokenSource cts)
{
  return new AsyncEnumerable<int>(async yield =>
  {
    foreach (var i in Enumerable.Range(0, 10))
    {
      // simulate a cancel part way through the enumeration
      if (i == 4) cts.Cancel();
      await yield.ReturnAsync(i);
    }
  });  
}

Linqpad Output
0 1 2 3 4 5 6 7 8 9

Linqpad Output I would have expected
0 1 2 3 -- A task was canceled.

The same thing happens when manually enumerating. I would have expected MoveNextAsync to be checking the cancellation token.

using (var cts = new CancellationTokenSource())
using (var enumerator = await PartialEnumerationCancelTest(cts).GetAsyncEnumeratorAsync(cts.Token))
  while (await enumerator.MoveNextAsync(cts.Token))
    enumerator.Current.Dump();

On the performance of Yield.SetCanceled function

My English is not great, so I try to briefly describe my question.
I saw your source code, When I use the IAsyncEnumerableExtensions.Take function to returns a specified number of contiguous elements, I noticed that in Yield class ReturnAsync return a TaskCompletionSource object called _resumeEnumerationTcs, when AsyncEnumeratorWithState disposed, it will set yield Canceled, which triggered _resumeEnumerationTcs to set exception. I find out exception would reduce performance, Is it's deliberately designed like this? And can be replaced by TrySetCanceled? Thanks~

yield.ReturnAsync inside ParallelForEachAsync

We have trouble using ParallelForEachAsync to start a number of parallel running async webrequests (actually they are Azure Blob Storage retrievals) and have the result being returned using yield.ReturnAsync.

The problem is that before the first result is returned, multiple parallel webrequests have been started and also completed, yet only the last of the results are returned to the consumer (that iterates using ForEachAsync).

The producer:

    public static IAsyncEnumerable<(MemoryStream, string)> Stream(this IEnumerable<string> blobNames,
        IBlobManager blobManager,
        CloudBlobContainer container, int maxDegreeOfParallelism = 5)
    {
        return new AsyncEnumerable<(MemoryStream, string)>(async yield =>
        {
            var cancellationToken1 = yield.CancellationToken;

            await blobNames.ParallelForEachAsync(async blobName =>
            {
                Console.WriteLine($"Trying to download blob: {blobName}");

               //TODO: Try-catch, what happens if one fail?
                var memoryStream = await blobManager
                    .DownloadBlobAsStreamAsync(container, blobName, cancellationToken1)
                    .ConfigureAwait(false);

                // Return immediately instead of waiting for all the blobs to complete
                await yield.ReturnAsync((memoryStream, blobName)).ConfigureAwait(false);
            }, maxDegreeOfParallelism, cancellationToken1).ConfigureAwait(false);
        });
    }

The consumer:

        var blobNames = MyFactory.BuildBlobNames(from, to);

        var asyncEnumerable = blobNames.Stream(BlobManager, Container, 4);

        // Assert
        var concurrentList = new ConcurrentBag<string>();
        await asyncEnumerable.ForEachAsync(async tuple =>
        {
            using (var ms = tuple.Item1)
            {
                var decoded = Encoding.UTF8.GetString(ms.ToArray());
                //TODO: Convert to text to assert the content
                concurrentList.Add(decoded);
                Console.WriteLine($"Blob: {tuple.Item2}. Content: {decoded}");
            }
        }, cancellationToken).ConfigureAwait(false);

What did we do wrong?

System is a reserved namespace

We would love to use this library but we can't since it is currently in the System namespace.

According to the C# standards (defined by ECMA Standards 334 ) in section 16.2 (page 252) it states:

Fully qualified names beginning with System. are reserved for use by the Standard Library (§Annex D).

Therefore, to be compliant with the .NET standards and to support a wider audience please move the great library to its own namespace.

Thank you.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.