Skip to content

Commit 2b6f2fe

Browse files
authored
fix(git-integration): target commit from an outdated branch (CM-717) (#3480)
1 parent 841833e commit 2b6f2fe

9 files changed

Lines changed: 151 additions & 34 deletions

File tree

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
-- Remove branch column from git.repositories table
2+
ALTER TABLE git.repositories
3+
DROP COLUMN IF EXISTS "branch";
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
ALTER TABLE git.repositories
2+
ADD COLUMN "branch" VARCHAR(255) DEFAULT NULL;
3+
4+
-- Add comment for documentation
5+
COMMENT ON COLUMN git.repositories."branch" IS 'The default branch being tracked for this repository (e.g., main, master, develop).';

services/apps/git_integration/src/crowdgit/database/crud.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ async def acquire_onboarding_repo() -> Repository | None:
5252
LIMIT 1
5353
FOR UPDATE SKIP LOCKED
5454
)
55-
RETURNING id, url, state, priority, "lastProcessedAt", "lastProcessedCommit", "lockedAt", "createdAt", "updatedAt", "segmentId", "integrationId", "maintainerFile", "lastMaintainerRunAt"
55+
RETURNING id, url, state, priority, "lastProcessedAt", "lastProcessedCommit", "lockedAt", "createdAt", "updatedAt", "segmentId", "integrationId", "maintainerFile", "lastMaintainerRunAt", "branch"
5656
"""
5757
return await acquire_repository(
5858
onboarding_repo_sql_query, (RepositoryState.PROCESSING, RepositoryState.PENDING)
@@ -101,7 +101,7 @@ async def acquire_recurrent_repo() -> Repository | None:
101101
LIMIT 1
102102
FOR UPDATE SKIP LOCKED
103103
)
104-
RETURNING id, url, state, priority, "lastProcessedAt", "lastProcessedCommit", "lockedAt", "createdAt", "updatedAt", "segmentId", "integrationId", "maintainerFile", "lastMaintainerRunAt"
104+
RETURNING id, url, state, priority, "lastProcessedAt", "lastProcessedCommit", "lockedAt", "createdAt", "updatedAt", "segmentId", "integrationId", "maintainerFile", "lastMaintainerRunAt", "branch"
105105
"""
106106
states_to_exclude = (RepositoryState.PENDING, RepositoryState.PROCESSING)
107107
return await acquire_repository(
@@ -134,17 +134,18 @@ async def release_repo(repo_id: str):
134134
return str(result)
135135

136136

137-
async def update_last_processed_commit(repo_id: str, commit_hash: str):
137+
async def update_last_processed_commit(repo_id: str, commit_hash: str, branch: str | None = None):
138138
"""
139-
Release repository lock (lockedAt) after processing
139+
Update last processed commit and optionally the branch after processing
140140
"""
141141
sql_query = """
142142
UPDATE git.repositories
143143
SET "lastProcessedCommit" = $1,
144+
"branch" = $2,
144145
"updatedAt" = NOW()
145-
WHERE id = $2
146+
WHERE id = $3
146147
"""
147-
result = await execute(sql_query, (commit_hash, repo_id))
148+
result = await execute(sql_query, (commit_hash, branch, repo_id))
148149
return str(result)
149150

150151

services/apps/git_integration/src/crowdgit/models/clone_batch.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ class CloneBatchInfo(BaseModel):
1919
default=None,
2020
description="The edge commit from the previous batch, used to track progress during incremental processing.",
2121
)
22+
clone_with_batches: bool = Field(
23+
default=True, description="Whether repo is cloned with batches"
24+
)
2225

2326
class Config:
2427
"""Pydantic configuration"""

services/apps/git_integration/src/crowdgit/models/repository.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ class Repository(BaseModel):
2828
None,
2929
description="Timestamp of when the repository maintainer processing was last executed",
3030
)
31+
branch: str | None = Field(
32+
None,
33+
description="The default branch being tracked for this repository (e.g., main, master, develop)",
34+
)
3135
created_at: datetime = Field(..., description="Creation timestamp")
3236
updated_at: datetime = Field(..., description="Last update timestamp")
3337

services/apps/git_integration/src/crowdgit/services/clone/clone_service.py

Lines changed: 86 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@
1313
from crowdgit.errors import CommandExecutionError, CrowdGitError
1414
from crowdgit.models import CloneBatchInfo, Repository, ServiceExecution
1515
from crowdgit.services.base.base_service import BaseService
16-
from crowdgit.services.utils import get_default_branch, get_repo_name, run_shell_command
16+
from crowdgit.services.utils import (
17+
get_default_branch,
18+
get_remote_default_branch,
19+
get_repo_name,
20+
run_shell_command,
21+
)
1722

1823
DEFAULT_CLONE_BATCH_DEPTH = 10
1924
DEFAULT_STORAGE_OPTIMIZATION_THRESHOLD_MB = 2000
@@ -47,9 +52,9 @@ async def _check_if_final_batch(self, path: str, target_commit_hash: str | None)
4752
except CommandExecutionError:
4853
return False
4954

50-
async def _init_minimal_clone(self, path: str, remote: str) -> None:
55+
async def _perform_minimal_clone(self, path: str, remote: str) -> None:
5156
"""
52-
Inits minimal clone of depth=1
57+
Perform minimal clone of depth=1
5358
"""
5459
# increasing post buffer to avoid RPC failed error
5560
await run_shell_command(
@@ -140,6 +145,7 @@ async def _update_batch_info(
140145
For batched clones (clone_with_batches=True): Checks if target commit reached or full history fetched.
141146
"""
142147
batch_info.repo_path = repo_path
148+
batch_info.clone_with_batches = clone_with_batches
143149

144150
if batch_info.is_first_batch:
145151
# Set latest commit only from first batch
@@ -271,16 +277,86 @@ async def _calculate_batch_depth(self, repo_path: str, remote: str) -> int:
271277
)
272278
return calculated_depth
273279

274-
async def _clone_repo(self, repo_path: str, remote: str):
275-
"""Perform full repository clone for new repositories that haven't been processed before"""
280+
async def _perform_full_clone(self, repo_path: str, remote: str):
281+
"""Perform full repository clone"""
282+
self.logger.info(f"Performing full clone for repo {remote}...")
276283
await run_shell_command(["git", "clone", remote, repo_path], cwd=repo_path)
277284
self.logger.info(f"Successfully completed full clone of repository: {remote}")
278285

286+
async def has_default_branch_changed(self, remote: str, saved_branch: str | None) -> bool:
287+
"""Check if the default branch has changed compared to the saved branch
288+
Args:
289+
remote: The remote repository URL
290+
saved_branch: The branch currently saved in the database (can be None)
291+
Returns:
292+
True if default branch has changed and requires re-cloning, False otherwise
293+
"""
294+
try:
295+
remote_default_branch = await get_remote_default_branch(remote)
296+
297+
if remote_default_branch is None:
298+
self.logger.warning(f"Could not determine default branch for {remote}")
299+
return False
300+
301+
if saved_branch is None:
302+
self.logger.info(f"No saved branch for {remote} assuming it's not changed")
303+
return False
304+
305+
if saved_branch != remote_default_branch:
306+
self.logger.info(
307+
f"Branch changed for {remote}: saved='{saved_branch}' vs remote='{remote_default_branch}'"
308+
)
309+
return True
310+
311+
self.logger.debug(f"Branch unchanged for {remote}: {saved_branch}")
312+
return False
313+
314+
except Exception as e:
315+
self.logger.error(f"Error validating branch for {remote}: {e}")
316+
# On error, assume no change to avoid unnecessary re-cloning
317+
return False
318+
319+
async def determine_clone_strategy(
320+
self, repo_path: str, remote: str, branch: str | None, last_processed_commit: str | None
321+
) -> bool:
322+
"""Determine whether to use full clone or minimal clone strategy based on repository state.
323+
324+
Args:
325+
repo_path: Local path where repository will be cloned
326+
remote: Remote repository URL (e.g., 'https://github.com/user/repo')
327+
branch: Current saved branch name or None for new repositories
328+
last_processed_commit: Last processed commit hash or None for new repositories
329+
330+
Returns: (clone_with_batches)
331+
bool: False for full clone (clone_with_batches=False), True for minimal clone (clone_with_batches=True)
332+
333+
Strategy:
334+
- Full clone: New repositories (last_processed_commit=None) or branch changed
335+
- Minimal clone: Existing repositories with unchanged branch for incremental processing
336+
"""
337+
338+
self.logger.info(
339+
f"Starting clone decision for {remote} (branch: {branch}, last_commit: {last_processed_commit})"
340+
)
341+
342+
default_branch_changed = await self.has_default_branch_changed(remote, branch)
343+
344+
if not last_processed_commit or default_branch_changed:
345+
reason = "new repository" if not last_processed_commit else "branch changed"
346+
self.logger.info(f"Performing full clone for {remote} - reason: {reason}")
347+
await self._perform_full_clone(repo_path, remote)
348+
return False
349+
350+
self.logger.info(
351+
f"Performing minimal clone for {remote} - existing repository with unchanged branch"
352+
)
353+
await self._perform_minimal_clone(repo_path, remote)
354+
return True
355+
279356
async def clone_batches_generator(
280357
self,
281358
repository: Repository,
282359
working_dir_cleanup: bool | None = False,
283-
clone_with_batches: bool | None = True,
284360
) -> AsyncIterator[CloneBatchInfo]:
285361
"""
286362
Async generator that yields CloneBatchInfo for repository cloning.
@@ -302,22 +378,15 @@ async def clone_batches_generator(
302378
remote=remote,
303379
is_final_batch=False,
304380
is_first_batch=True,
305-
total_commits_count=0,
306381
)
307382
try:
308383
temp_repo_path = tempfile.mkdtemp(prefix=f"{get_repo_name(remote)}_")
309-
batch_info.repo_path = temp_repo_path
310384
batch_start_time = time.time()
311385

312-
if not clone_with_batches:
313-
self.logger.info(f"Performing full clone for repo: {remote}")
314-
await self._clone_repo(temp_repo_path, remote)
315-
else:
316-
# Incremental processing: start with minimal clone and fetch in batches
317-
self.logger.info(
318-
f"Performing incremental batched clone for existing repository: {remote}"
319-
)
320-
await self._init_minimal_clone(temp_repo_path, remote)
386+
clone_with_batches = await self.determine_clone_strategy(
387+
temp_repo_path, remote, repository.branch, repository.last_processed_commit
388+
)
389+
if clone_with_batches:
321390
batch_depth = await self._calculate_batch_depth(temp_repo_path, remote)
322391
await self._update_batch_info(
323392
batch_info, temp_repo_path, repository.last_processed_commit, clone_with_batches

services/apps/git_integration/src/crowdgit/services/commit/commit_service.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ async def process_single_batch_commits(
115115
self,
116116
repository: Repository,
117117
batch_info: CloneBatchInfo,
118-
clone_with_batches: bool,
119118
) -> None:
120119
"""
121120
Process commits from a cloned batch.
@@ -147,7 +146,7 @@ async def process_single_batch_commits(
147146
)
148147
raw_commits = await self._execute_git_log(
149148
batch_info.repo_path,
150-
clone_with_batches,
149+
batch_info.clone_with_batches,
151150
batch_info.prev_batch_edge_commit,
152151
batch_info.edge_commit,
153152
)

services/apps/git_integration/src/crowdgit/services/utils.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,42 @@ def get_repo_name(remote: str) -> str:
8989
return "-".join(parts)
9090

9191

92+
async def get_remote_default_branch(remote_url: str) -> str | None:
93+
"""Get the default branch of a remote repository without cloning
94+
95+
Args:
96+
remote_url: The URL of the remote repository.
97+
98+
Returns:
99+
The default branch name, or None if unable to determine.
100+
"""
101+
try:
102+
# Use git ls-remote to get the symbolic reference for HEAD
103+
output = await run_shell_command(["git", "ls-remote", "--symref", remote_url, "HEAD"])
104+
105+
# Parse the output to find the symbolic reference
106+
# Output format: "ref: refs/heads/main\tHEAD\n<commit_hash>\tHEAD"
107+
lines = output.strip().split("\n")
108+
for line in lines:
109+
if line.startswith("ref: refs/heads/"):
110+
# Extract branch name from "ref: refs/heads/main"
111+
return line.split("refs/heads/")[-1].split("\t")[0]
112+
113+
# Fallback: if symbolic ref not available, try common branches
114+
for branch in ["main", "master"]:
115+
try:
116+
await run_shell_command(["git", "ls-remote", "--heads", remote_url, branch])
117+
return branch
118+
except CommandExecutionError:
119+
continue
120+
121+
return None
122+
123+
except CommandExecutionError as e:
124+
logger.warning(f"Failed to get remote default branch for {remote_url}: {e}")
125+
return None
126+
127+
92128
async def get_default_branch(repo_path: str) -> str:
93129
"""Get the default branch of the repository using local repo
94130

services/apps/git_integration/src/crowdgit/worker/repository_worker.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
QueueService,
2020
SoftwareValueService,
2121
)
22-
from crowdgit.services.utils import get_repo_name
22+
from crowdgit.services.utils import get_default_branch, get_repo_name
2323
from crowdgit.settings import WORKER_ERROR_BACKOFF_SEC, WORKER_POLLING_INTERVAL_SEC
2424

2525

@@ -142,27 +142,24 @@ async def _process_single_repository(self, repository: Repository):
142142
try:
143143
repo_name = get_repo_name(repository.url)
144144
self._bind_repository_context(repository, repo_name)
145-
# Use full clone for new repositories (no last_processed_commit), batched clone for existing ones
146-
clone_with_batches = True if repository.last_processed_commit else False
147-
logger.info(
148-
f"Starting repository cloning for {repo_name} with batching={clone_with_batches}"
149-
)
150145
async for batch_info in self.clone_service.clone_batches_generator(
151146
repository,
152147
working_dir_cleanup=True,
153-
clone_with_batches=clone_with_batches,
154148
):
155149
logger.info(f"Clone batch info: {batch_info}")
156150
if batch_info.is_first_batch:
157151
await self.software_value_service.run(repository.id, batch_info.repo_path)
158152
await self.maintainer_service.process_maintainers(repository, batch_info)
159153
await self.commit_service.process_single_batch_commits(
160-
repository, batch_info, clone_with_batches
154+
repository,
155+
batch_info,
161156
)
162157

163158
if batch_info.is_final_batch:
164159
await update_last_processed_commit(
165-
repo_id=repository.id, commit_hash=batch_info.latest_commit_in_repo
160+
repo_id=repository.id,
161+
commit_hash=batch_info.latest_commit_in_repo,
162+
branch=await get_default_branch(batch_info.repo_path),
166163
)
167164

168165
logger.info("Incremental processing completed successfully")

0 commit comments

Comments
 (0)