Skip to content

Commit 61f14e6

Browse files
committed
feat: DataFrame.Typed.Lazy module.
1 parent b83b451 commit 61f14e6

2 files changed

Lines changed: 207 additions & 0 deletions

File tree

dataframe.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ library
114114
DataFrame.Typed.Aggregate,
115115
DataFrame.Typed.TH,
116116
DataFrame.Typed.Expr,
117+
DataFrame.Typed.Lazy,
117118
DataFrame.Typed
118119
build-depends: base >= 4 && <5,
119120
deepseq >= 1 && < 2,

src/DataFrame/Typed/Lazy.hs

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
{-# LANGUAGE AllowAmbiguousTypes #-}
2+
{-# LANGUAGE DataKinds #-}
3+
{-# LANGUAGE FlexibleContexts #-}
4+
{-# LANGUAGE GADTs #-}
5+
{-# LANGUAGE KindSignatures #-}
6+
{-# LANGUAGE ScopedTypeVariables #-}
7+
{-# LANGUAGE TypeApplications #-}
8+
9+
{- |
10+
Module : DataFrame.Typed.Lazy
11+
Copyright : (c) 2025
12+
License : MIT
13+
Stability : experimental
14+
15+
Type-safe lazy query pipelines.
16+
17+
This module combines the compile-time schema tracking of 'TypedDataFrame'
18+
with the deferred execution of 'LazyDataFrame'. Queries are built as a
19+
logical plan tree with phantom-typed schema tracking; execution is deferred
20+
until 'run' is called.
21+
22+
@
23+
{\-\# LANGUAGE DataKinds, TypeApplications, TypeOperators \#-\}
24+
import qualified DataFrame.Typed.Lazy as TL
25+
import DataFrame.Typed (Column)
26+
27+
type Schema = '[Column \"id\" Int, Column \"name\" Text, Column \"score\" Double]
28+
29+
main = do
30+
let query = TL.scanCsv \@Schema \"data.csv\"
31+
& TL.filter (TL.col \@\"score\" TL..>. TL.lit 0.5)
32+
& TL.select \@'[\"id\", \"name\"]
33+
df <- TL.run query -- TypedDataFrame '[Column \"id\" Int, Column \"name\" Text]
34+
print df
35+
@
36+
-}
37+
module DataFrame.Typed.Lazy (
38+
-- * Core type
39+
TypedLazyDataFrame,
40+
41+
-- * Data sources
42+
scanCsv,
43+
scanSeparated,
44+
scanParquet,
45+
fromDataFrame,
46+
fromTypedDataFrame,
47+
48+
-- * Schema-preserving operations
49+
filter,
50+
take,
51+
52+
-- * Schema-modifying operations
53+
derive,
54+
select,
55+
56+
-- * Aggregation
57+
groupBy,
58+
aggregate,
59+
60+
-- * Joins
61+
join,
62+
63+
-- * Sort
64+
sortBy,
65+
66+
-- * Execution
67+
run,
68+
69+
-- * Re-exports for pipeline construction
70+
module DataFrame.Typed.Expr,
71+
module DataFrame.Typed.Types,
72+
SortOrder (..),
73+
) where
74+
75+
import qualified Data.Text as T
76+
import Data.Proxy (Proxy (..))
77+
import GHC.TypeLits (KnownSymbol, Symbol, symbolVal)
78+
import Prelude hiding (filter, take)
79+
80+
import qualified DataFrame.Internal.Column as C
81+
import qualified DataFrame.Internal.Expression as E
82+
import DataFrame.Internal.Schema (Schema)
83+
import DataFrame.Lazy.Internal.DataFrame (LazyDataFrame)
84+
import qualified DataFrame.Lazy.Internal.DataFrame as L
85+
import DataFrame.Lazy.Internal.LogicalPlan (SortOrder (..))
86+
import DataFrame.Operations.Join (JoinType)
87+
import DataFrame.Typed.Expr
88+
import DataFrame.Typed.Freeze (unsafeFreeze)
89+
import DataFrame.Typed.Schema
90+
import DataFrame.Typed.Types
91+
92+
-- | A lazy query with compile-time schema tracking.
93+
newtype TypedLazyDataFrame (cols :: [*]) = TLD {unTLD :: LazyDataFrame}
94+
95+
instance Show (TypedLazyDataFrame cols) where
96+
show (TLD ldf) = "TypedLazyDataFrame { " ++ show ldf ++ " }"
97+
98+
-- | Scan a CSV file with a given schema.
99+
scanCsv ::
100+
Schema ->
101+
T.Text ->
102+
TypedLazyDataFrame cols
103+
scanCsv schema path = TLD (L.scanCsv schema path)
104+
105+
-- | Scan a character-separated file with a given schema.
106+
scanSeparated ::
107+
Char ->
108+
Schema ->
109+
T.Text ->
110+
TypedLazyDataFrame cols
111+
scanSeparated sep schema path = TLD (L.scanSeparated sep schema path)
112+
113+
-- | Scan a Parquet file, directory, or glob pattern with a given schema.
114+
scanParquet ::
115+
Schema ->
116+
T.Text ->
117+
TypedLazyDataFrame cols
118+
scanParquet schema path = TLD (L.scanParquet schema path)
119+
120+
-- | Lift an already-loaded eager 'TypedDataFrame' into a lazy plan.
121+
fromDataFrame :: TypedDataFrame cols -> TypedLazyDataFrame cols
122+
fromDataFrame (TDF df) = TLD (L.fromDataFrame df)
123+
124+
-- | Synonym for 'fromDataFrame'.
125+
fromTypedDataFrame :: TypedDataFrame cols -> TypedLazyDataFrame cols
126+
fromTypedDataFrame = fromDataFrame
127+
128+
-- | Keep rows that satisfy the predicate.
129+
filter :: TExpr cols Bool -> TypedLazyDataFrame cols -> TypedLazyDataFrame cols
130+
filter (TExpr expr) (TLD ldf) = TLD (L.filter expr ldf)
131+
132+
-- | Retain at most @n@ rows.
133+
take :: Int -> TypedLazyDataFrame cols -> TypedLazyDataFrame cols
134+
take n (TLD ldf) = TLD (L.take n ldf)
135+
136+
-- | Add a computed column.
137+
derive ::
138+
forall name a cols.
139+
(KnownSymbol name, C.Columnable a, AssertAbsent name cols) =>
140+
TExpr cols a ->
141+
TypedLazyDataFrame cols ->
142+
TypedLazyDataFrame (Snoc cols (Column name a))
143+
derive (TExpr expr) (TLD ldf) =
144+
TLD (L.derive (T.pack (symbolVal (Proxy @name))) expr ldf)
145+
146+
-- | Retain only the listed columns.
147+
select ::
148+
forall (names :: [Symbol]) cols.
149+
(AllKnownSymbol names, AssertAllPresent names cols) =>
150+
TypedLazyDataFrame cols ->
151+
TypedLazyDataFrame (SubsetSchema names cols)
152+
select (TLD ldf) = TLD (L.select (DataFrame.Typed.Schema.symbolVals @names) ldf)
153+
154+
-- | A typed lazy grouped query.
155+
newtype TypedLazyGrouped (keys :: [Symbol]) (cols :: [*]) = TLG
156+
{ unTLG :: ([T.Text], LazyDataFrame)
157+
}
158+
159+
-- | Group by key columns.
160+
groupBy ::
161+
forall (keys :: [Symbol]) cols.
162+
(AllKnownSymbol keys, AssertAllPresent keys cols) =>
163+
TypedLazyDataFrame cols ->
164+
TypedLazyGrouped keys cols
165+
groupBy (TLD ldf) = TLG (DataFrame.Typed.Schema.symbolVals @keys, ldf)
166+
167+
-- | Aggregate a grouped lazy query.
168+
aggregate ::
169+
forall keys cols aggs.
170+
TAgg keys cols aggs ->
171+
TypedLazyGrouped keys cols ->
172+
TypedLazyDataFrame (Append (GroupKeyColumns keys cols) (Reverse aggs))
173+
aggregate tagg (TLG (keys, ldf)) =
174+
TLD (L.groupBy keys (aggToNamedExprs tagg) ldf)
175+
176+
-- | Join two lazy queries on a shared key column.
177+
join ::
178+
JoinType ->
179+
T.Text ->
180+
T.Text ->
181+
TypedLazyDataFrame left ->
182+
TypedLazyDataFrame right ->
183+
TypedLazyDataFrame left -- TODO: compute join result schema
184+
join jt leftKey rightKey (TLD left) (TLD right) =
185+
TLD (L.join jt leftKey rightKey left right)
186+
187+
-- | Sort the result by column name and direction.
188+
sortBy ::
189+
[(T.Text, SortOrder)] ->
190+
TypedLazyDataFrame cols ->
191+
TypedLazyDataFrame cols
192+
sortBy cols (TLD ldf) = TLD (L.sortBy cols ldf)
193+
194+
-- | Execute the lazy query and return a typed DataFrame.
195+
run ::
196+
forall cols.
197+
(KnownSchema cols) =>
198+
TypedLazyDataFrame cols ->
199+
IO (TypedDataFrame cols)
200+
run (TLD ldf) = unsafeFreeze <$> L.runDataFrame ldf
201+
202+
-- | Convert TAgg to untyped named expressions for the lazy groupBy.
203+
aggToNamedExprs :: TAgg keys cols aggs -> [(T.Text, E.UExpr)]
204+
aggToNamedExprs TAggNil = []
205+
aggToNamedExprs (TAggCons name (TExpr expr) rest) =
206+
(name, E.UExpr expr) : aggToNamedExprs rest

0 commit comments

Comments
 (0)