Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit ba23cca

Browse files
charlesccychenaaltay
authored andcommitted
Make retry logic idempotent in GcsIO.delete and GcsIO.rename
----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=124180161
1 parent 4d1fedb commit ba23cca

2 files changed

Lines changed: 116 additions & 17 deletions

File tree

google/cloud/dataflow/io/gcsio.py

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -139,38 +139,54 @@ def glob(self, pattern):
139139

140140
@retry.with_exponential_backoff() # Using retry defaults from utils/retry.py
141141
def delete(self, path):
142-
"""Deletes the given gcs object.
142+
"""Deletes the object at the given GCS path.
143143
144144
Args:
145145
path: GCS file path pattern in the form gs://<bucket>/<name>.
146146
"""
147147
bucket, object_path = parse_gcs_path(path)
148148
request = storage.StorageObjectsDeleteRequest(bucket=bucket,
149149
object=object_path)
150-
self.client.objects.Delete(request)
150+
try:
151+
self.client.objects.Delete(request)
152+
except HttpError as http_error:
153+
if http_error.status_code == 404:
154+
# Return success when the file doesn't exist anymore for idempotency.
155+
return
156+
raise
151157

152158
@retry.with_exponential_backoff() # Using retry defaults from utils/retry.py
153-
def rename(self, src, dst):
154-
"""Renames the given gcs object from src to dst.
159+
def copy(self, src, dest):
160+
"""Copies the given GCS object from src to dest.
155161
156162
Args:
157163
src: GCS file path pattern in the form gs://<bucket>/<name>.
158-
dst: GCS file path pattern in the form gs://<bucket>/<name>.
164+
dest: GCS file path pattern in the form gs://<bucket>/<name>.
159165
"""
160166
src_bucket, src_path = parse_gcs_path(src)
161-
dst_bucket, dst_path = parse_gcs_path(dst)
167+
dest_bucket, dest_path = parse_gcs_path(dest)
162168
request = storage.StorageObjectsCopyRequest(sourceBucket=src_bucket,
163169
sourceObject=src_path,
164-
destinationBucket=dst_bucket,
165-
destinationObject=dst_path)
170+
destinationBucket=dest_bucket,
171+
destinationObject=dest_path)
166172
self.client.objects.Copy(request)
167-
request = storage.StorageObjectsDeleteRequest(bucket=src_bucket,
168-
object=src_path)
169-
self.client.objects.Delete(request)
173+
174+
# We intentionally do not decorate this method with a retry, since the
175+
# underlying copy and delete operations are already idempotent operations
176+
# protected by retry decorators.
177+
def rename(self, src, dest):
178+
"""Renames the given GCS object from src to dest.
179+
180+
Args:
181+
src: GCS file path pattern in the form gs://<bucket>/<name>.
182+
dest: GCS file path pattern in the form gs://<bucket>/<name>.
183+
"""
184+
self.copy(src, dest)
185+
self.delete(src)
170186

171187
@retry.with_exponential_backoff() # Using retry defaults from utils/retry.py
172188
def exists(self, path):
173-
"""Returns whether the given gcs object exists.
189+
"""Returns whether the given GCS object exists.
174190
175191
Args:
176192
path: GCS file path pattern in the form gs://<bucket>/<name>.

google/cloud/dataflow/io/gcsio_test.py

Lines changed: 88 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@
2121
import threading
2222
import unittest
2323

24-
from google.cloud.dataflow.io import gcsio
2524

25+
import httplib2
26+
27+
from google.cloud.dataflow.io import gcsio
28+
from apitools.base.py.exceptions import HttpError
2629
from google.cloud.dataflow.internal.clients import storage
2730

2831

@@ -53,14 +56,24 @@ class FakeGcsObjects(object):
5356

5457
def __init__(self):
5558
self.files = {}
59+
# Store the last generation used for a given object name. Note that this
60+
# has to persist even past the deletion of the object.
61+
self.last_generation = {}
5662
self.list_page_tokens = {}
5763

5864
def add_file(self, f):
5965
self.files[(f.bucket, f.object)] = f
66+
self.last_generation[(f.bucket, f.object)] = f.generation
6067

6168
def get_file(self, bucket, obj):
6269
return self.files.get((bucket, obj), None)
6370

71+
def delete_file(self, bucket, obj):
72+
del self.files[(bucket, obj)]
73+
74+
def get_last_generation(self, bucket, obj):
75+
return self.last_generation.get((bucket, obj), 0)
76+
6477
def Get(self, get_request, download=None): # pylint: disable=invalid-name
6578
f = self.get_file(get_request.bucket, get_request.object)
6679
if f is None:
@@ -77,10 +90,8 @@ def get_range_callback(start, end):
7790

7891
def Insert(self, insert_request, upload=None): # pylint: disable=invalid-name
7992
assert upload is not None
80-
generation = 1
81-
f = self.get_file(insert_request.bucket, insert_request.name)
82-
if f is not None:
83-
generation = f.generation + 1
93+
generation = self.get_last_generation(insert_request.bucket,
94+
insert_request.name) + 1
8495
f = FakeFile(insert_request.bucket, insert_request.name, '', generation)
8596

8697
# Stream data into file.
@@ -95,6 +106,26 @@ def Insert(self, insert_request, upload=None): # pylint: disable=invalid-name
95106

96107
self.add_file(f)
97108

109+
def Copy(self, copy_request): # pylint: disable=invalid-name
110+
src_file = self.get_file(copy_request.sourceBucket,
111+
copy_request.sourceObject)
112+
assert src_file is not None
113+
generation = self.get_last_generation(copy_request.destinationBucket,
114+
copy_request.destinationObject) + 1
115+
dest_file = FakeFile(copy_request.destinationBucket,
116+
copy_request.destinationObject,
117+
src_file.contents, generation)
118+
self.add_file(dest_file)
119+
120+
def Delete(self, delete_request): # pylint: disable=invalid-name
121+
# Here, we emulate the behavior of the GCS service in raising a 404 error
122+
# if this object already exists.
123+
if self.get_file(delete_request.bucket, delete_request.object):
124+
self.delete_file(delete_request.bucket, delete_request.object)
125+
else:
126+
raise HttpError(httplib2.Response({'status': '404'}), '404 Not Found',
127+
'https://fake/url')
128+
98129
def List(self, list_request): # pylint: disable=invalid-name
99130
bucket = list_request.bucket
100131
prefix = list_request.prefix or ''
@@ -154,6 +185,58 @@ def setUp(self):
154185
self.client = FakeGcsClient()
155186
self.gcs = gcsio.GcsIO(self.client)
156187

188+
def test_delete(self):
189+
file_name = 'gs://gcsio-test/delete_me'
190+
file_size = 1024
191+
192+
# Test deletion of non-existent file.
193+
self.gcs.delete(file_name)
194+
195+
self._insert_random_file(self.client, file_name, file_size)
196+
self.assertTrue(gcsio.parse_gcs_path(file_name) in
197+
self.client.objects.files)
198+
199+
self.gcs.delete(file_name)
200+
201+
self.assertFalse(gcsio.parse_gcs_path(file_name) in
202+
self.client.objects.files)
203+
204+
def test_copy(self):
205+
src_file_name = 'gs://gcsio-test/source'
206+
dest_file_name = 'gs://gcsio-test/dest'
207+
file_size = 1024
208+
self._insert_random_file(self.client, src_file_name,
209+
file_size)
210+
self.assertTrue(gcsio.parse_gcs_path(src_file_name) in
211+
self.client.objects.files)
212+
self.assertFalse(gcsio.parse_gcs_path(dest_file_name) in
213+
self.client.objects.files)
214+
215+
self.gcs.copy(src_file_name, dest_file_name)
216+
217+
self.assertTrue(gcsio.parse_gcs_path(src_file_name) in
218+
self.client.objects.files)
219+
self.assertTrue(gcsio.parse_gcs_path(dest_file_name) in
220+
self.client.objects.files)
221+
222+
def test_rename(self):
223+
src_file_name = 'gs://gcsio-test/source'
224+
dest_file_name = 'gs://gcsio-test/dest'
225+
file_size = 1024
226+
self._insert_random_file(self.client, src_file_name,
227+
file_size)
228+
self.assertTrue(gcsio.parse_gcs_path(src_file_name) in
229+
self.client.objects.files)
230+
self.assertFalse(gcsio.parse_gcs_path(dest_file_name) in
231+
self.client.objects.files)
232+
233+
self.gcs.rename(src_file_name, dest_file_name)
234+
235+
self.assertFalse(gcsio.parse_gcs_path(src_file_name) in
236+
self.client.objects.files)
237+
self.assertTrue(gcsio.parse_gcs_path(dest_file_name) in
238+
self.client.objects.files)
239+
157240
def test_full_file_read(self):
158241
file_name = 'gs://gcsio-test/full_file'
159242
file_size = 5 * 1024 * 1024 + 100

0 commit comments

Comments
 (0)