Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

* Performance: Optimised `AsyncSeq.pairwise` to use a `hasPrev` flag and a direct `mutable` field instead of wrapping the previous element in `Some`. Previously, each iteration allocated a new `'T option` object on the heap; the new implementation eliminates that allocation entirely, reducing GC pressure for long sequences.
* Bug fix: `AsyncSeq.splitAt` and `AsyncSeq.tryTail` now correctly dispose the underlying enumerator when an exception or cancellation occurs during the initial `MoveNext` call. Previously the enumerator could leak if the source sequence threw during the first few steps.
* Added `AsyncSeq.tryFindIndexBack` and `AsyncSeq.findIndexBack` β€” return the index of the last element satisfying a predicate. Mirrors `Array.tryFindIndexBack` / `Array.findIndexBack`. Async-predicate variants `tryFindIndexBackAsync` and `findIndexBackAsync` are also included.

### 4.16.0

Expand Down
39 changes: 39 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1460,6 +1460,45 @@
| None -> return raise (System.Collections.Generic.KeyNotFoundException("An element satisfying the predicate was not found in the collection."))
| Some i -> return i }

let tryFindIndexBack f (source : AsyncSeq<'T>) = async {
use ie = source.GetEnumerator()
let! v = ie.MoveNext()
let mutable b = v
let mutable i = 0
let mutable res = None
while b.IsSome do
if f b.Value then res <- Some i
let! next = ie.MoveNext()
b <- next
i <- i + 1
return res }

let tryFindIndexBackAsync f (source : AsyncSeq<'T>) = async {
use ie = source.GetEnumerator()
let! v = ie.MoveNext()
let mutable b = v
let mutable i = 0
let mutable res = None
while b.IsSome do
let! matches = f b.Value
if matches then res <- Some i
let! next = ie.MoveNext()
b <- next
i <- i + 1
return res }

let findIndexBack f (source : AsyncSeq<'T>) = async {
let! result = tryFindIndexBack f source
match result with
| None -> return raise (System.Collections.Generic.KeyNotFoundException("An element satisfying the predicate was not found in the collection."))
| Some i -> return i }

let findIndexBackAsync f (source : AsyncSeq<'T>) = async {
let! result = tryFindIndexBackAsync f source
match result with
| None -> return raise (System.Collections.Generic.KeyNotFoundException("An element satisfying the predicate was not found in the collection."))
| Some i -> return i }

let exists f (source : AsyncSeq<'T>) =
source |> tryFind f |> Async.map Option.isSome

Expand Down Expand Up @@ -2774,7 +2813,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2816 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2816 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
16 changes: 16 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,22 @@ module AsyncSeq =
/// Raises KeyNotFoundException if no matching element is found.
val findIndexAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> Async<int>

/// Asynchronously find the index of the last value in a sequence for which the predicate returns true.
/// Returns None if no matching element is found.
val tryFindIndexBack : predicate:('T -> bool) -> source:AsyncSeq<'T> -> Async<int option>

/// Asynchronously find the index of the last value in a sequence for which the async predicate returns true.
/// Returns None if no matching element is found.
val tryFindIndexBackAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> Async<int option>

/// Asynchronously find the index of the last value in a sequence for which the predicate returns true.
/// Raises KeyNotFoundException if no matching element is found.
val findIndexBack : predicate:('T -> bool) -> source:AsyncSeq<'T> -> Async<int>

/// Asynchronously find the index of the last value in a sequence for which the async predicate returns true.
/// Raises KeyNotFoundException if no matching element is found.
val findIndexBackAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> Async<int>

/// Asynchronously determine if there is a value in the sequence for which the predicate returns true
val exists : predicate:('T -> bool) -> source:AsyncSeq<'T> -> Async<bool>

Expand Down
65 changes: 65 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2094,7 +2094,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 2097 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -2103,7 +2103,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 2106 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down Expand Up @@ -3491,6 +3491,71 @@
|> Async.RunSynchronously
Assert.AreEqual(None, result)

// ===== tryFindIndexBack / findIndexBack / tryFindIndexBackAsync / findIndexBackAsync =====

[<Test>]
let ``AsyncSeq.tryFindIndexBack returns index of last matching element`` () =
let result = AsyncSeq.ofSeq [ 1; 2; 3; 2; 1 ] |> AsyncSeq.tryFindIndexBack (fun x -> x = 2) |> Async.RunSynchronously
Assert.AreEqual(Some 3, result)

[<Test>]
let ``AsyncSeq.tryFindIndexBack returns None when no match`` () =
let result = AsyncSeq.ofSeq [ 1; 2; 3 ] |> AsyncSeq.tryFindIndexBack (fun x -> x = 99) |> Async.RunSynchronously
Assert.AreEqual(None, result)

[<Test>]
let ``AsyncSeq.tryFindIndexBack returns None for empty sequence`` () =
let result = AsyncSeq.empty<int> |> AsyncSeq.tryFindIndexBack (fun _ -> true) |> Async.RunSynchronously
Assert.AreEqual(None, result)

[<Test>]
let ``AsyncSeq.tryFindIndexBack returns index of last element when all match`` () =
let result = AsyncSeq.ofSeq [ 1; 2; 3 ] |> AsyncSeq.tryFindIndexBack (fun _ -> true) |> Async.RunSynchronously
Assert.AreEqual(Some 2, result)

[<Test>]
let ``AsyncSeq.findIndexBack returns index of last matching element`` () =
let result = AsyncSeq.ofSeq [ 10; 20; 30; 20; 10 ] |> AsyncSeq.findIndexBack (fun x -> x = 20) |> Async.RunSynchronously
Assert.AreEqual(3, result)

[<Test>]
let ``AsyncSeq.findIndexBack raises KeyNotFoundException when no match`` () =
Assert.Throws<System.Collections.Generic.KeyNotFoundException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ] |> AsyncSeq.findIndexBack (fun x -> x = 99) |> Async.RunSynchronously |> ignore)
|> ignore

[<Test>]
let ``AsyncSeq.tryFindIndexBackAsync returns index of last matching element`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3; 2; 1 ]
|> AsyncSeq.tryFindIndexBackAsync (fun x -> async { return x = 2 })
|> Async.RunSynchronously
Assert.AreEqual(Some 3, result)

[<Test>]
let ``AsyncSeq.tryFindIndexBackAsync returns None when no match`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.tryFindIndexBackAsync (fun x -> async { return x = 99 })
|> Async.RunSynchronously
Assert.AreEqual(None, result)

[<Test>]
let ``AsyncSeq.findIndexBackAsync returns index of last matching element`` () =
let result =
AsyncSeq.ofSeq [ 5; 4; 3; 4; 5 ]
|> AsyncSeq.findIndexBackAsync (fun x -> async { return x = 4 })
|> Async.RunSynchronously
Assert.AreEqual(3, result)

[<Test>]
let ``AsyncSeq.findIndexBackAsync raises KeyNotFoundException when no match`` () =
Assert.Throws<System.Collections.Generic.KeyNotFoundException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.findIndexBackAsync (fun x -> async { return x = 99 })
|> Async.RunSynchronously |> ignore)
|> ignore

// ===== tryFindBack / findBack / tryFindBackAsync / findBackAsync =====

[<Test>]
Expand Down Expand Up @@ -4596,7 +4661,7 @@
let ``AsyncSeq.groupByAsync groups elements by async projection`` () =
let result =
AsyncSeq.ofSeq [1..6]
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4664 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand All @@ -4609,7 +4674,7 @@
let ``AsyncSeq.groupByAsync on empty sequence returns empty`` () =
let result =
AsyncSeq.empty<int>
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4677 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([||], result)
Expand All @@ -4618,7 +4683,7 @@
let ``AsyncSeq.groupByAsync with all-same key produces single group`` () =
let result =
AsyncSeq.ofSeq [1; 2; 3]
|> AsyncSeq.groupByAsync (fun _ -> async { return "same" })

Check warning on line 4686 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand Down
Loading