2626
2727logger = logging .getLogger (__name__ )
2828
29+
2930class Batch :
30- """A wrapper for the NGSI-LD API batch endpoint."""
31+ """A wrapper for the NGSI-LD API batch endpoint."""
3132
3233 def __init__ (self , client : AsyncClient , url : str ):
3334 self ._client = client
3435 self ._session = client .client
3536 self .url = url
3637
3738 @rfc7807_error_handle_async
38- async def _create (
39- self , entities : Sequence [Entity ]) -> BatchResult :
40- r = await self ._session .post (
41- f"{ self .url } /create/" ,
42- content = json .dumps ([e for e in entities ], cls = NgsiEncoder )
43- )
39+ async def _create (self , entities : Sequence [Entity ]) -> BatchResult :
40+ r = await self ._session .post (f"{ self .url } /create/" , content = json .dumps ([e for e in entities ], cls = NgsiEncoder ))
4441 self ._client .raise_for_status (r )
4542 if r .status_code == 201 :
4643 success , errors = r .json (), []
@@ -55,16 +52,14 @@ async def _create(
5552 async def create (self , entities : Sequence [Entity ], batchsize : int = BATCHSIZE ) -> BatchResult :
5653 r = BatchResult ("create" )
5754 for i in range (0 , len (entities ), batchsize ):
58- r += await self ._create (entities [i : i + batchsize ])
55+ r += await self ._create (entities [i : i + batchsize ])
5956 return r
60-
57+
6158 @rfc7807_error_handle_async
6259 async def _upsert (self , entities : Sequence [Entity ], opt : Literal ["replace" , "update" ] = "replace" ) -> BatchResult :
6360 params = {"options" : opt } if opt else {}
6461 r = await self ._session .post (
65- f"{ self .url } /upsert/" ,
66- content = json .dumps ([e for e in entities ], cls = NgsiEncoder ),
67- params = params
62+ f"{ self .url } /upsert/" , content = json .dumps ([e for e in entities ], cls = NgsiEncoder ), params = params
6863 )
6964 self ._client .raise_for_status (r )
7065 if r .status_code == 201 :
@@ -79,21 +74,21 @@ async def _upsert(self, entities: Sequence[Entity], opt: Literal["replace", "upd
7974 return BatchResult ("upsert" , success , errors )
8075
8176 @rfc7807_error_handle_async
82- async def upsert (self , entities : Sequence [Entity ], * , update : bool = False , batchsize : int = BATCHSIZE ) -> BatchResult :
77+ async def upsert (
78+ self , entities : Sequence [Entity ], * , update : bool = False , batchsize : int = BATCHSIZE
79+ ) -> BatchResult :
8380 # default mode (without any option) is "replace", anyway always force the option
84- opt = "update" if update else "replace"
81+ opt = "update" if update else "replace"
8582 r = BatchResult ("upsert" )
8683 for i in range (0 , len (entities ), batchsize ):
87- r += await self ._upsert (entities [i : i + batchsize ], opt )
84+ r += await self ._upsert (entities [i : i + batchsize ], opt )
8885 return r
8986
9087 @rfc7807_error_handle_async
9188 async def _update (self , entities : Sequence [Entity ], opt : Literal ["noOverwrite" ] = None ) -> BatchResult :
9289 params = {"options" : opt } if opt else {}
9390 r = await self ._session .post (
94- f"{ self .url } /update/" ,
95- content = json .dumps ([e for e in entities ], cls = NgsiEncoder ),
96- params = params
91+ f"{ self .url } /update/" , content = json .dumps ([e for e in entities ], cls = NgsiEncoder ), params = params
9792 )
9893 self ._client .raise_for_status (r )
9994 if r .status_code == 204 :
@@ -106,11 +101,13 @@ async def _update(self, entities: Sequence[Entity], opt: Literal["noOverwrite"]
106101 return BatchResult ("update" , success , errors )
107102
108103 @rfc7807_error_handle_async
109- async def update (self , entities : Sequence [Entity ], * , overwrite : bool = True , batchsize : int = BATCHSIZE ) -> BatchResult :
104+ async def update (
105+ self , entities : Sequence [Entity ], * , overwrite : bool = True , batchsize : int = BATCHSIZE
106+ ) -> BatchResult :
110107 opt = "noOverwrite" if not overwrite else None
111108 r = BatchResult ("update" )
112109 for i in range (0 , len (entities ), batchsize ):
113- r += await self ._update (entities [i : i + batchsize ], opt )
110+ r += await self ._update (entities [i : i + batchsize ], opt )
114111 return r
115112
116113 @rfc7807_error_handle_async
@@ -132,5 +129,5 @@ async def _delete(self, entities: Sequence[EntityOrId]) -> BatchResult:
132129 async def delete (self , entities : Sequence [EntityOrId ], batchsize : int = BATCHSIZE ) -> BatchResult :
133130 r = BatchResult ("delete" )
134131 for i in range (0 , len (entities ), batchsize ):
135- r += await self ._delete (entities [i : i + batchsize ])
132+ r += await self ._delete (entities [i : i + batchsize ])
136133 return r
0 commit comments