Skip to content

Commit 3dde210

Browse files
committed
Add script for retrying all missed job webhooks
1 parent 107b3bb commit 3dde210

File tree

1 file changed

+146
-0
lines changed

1 file changed

+146
-0
lines changed
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
from concurrent.futures import Future, ThreadPoolExecutor, wait
2+
from dataclasses import dataclass
3+
from datetime import timedelta
4+
import itertools
5+
import re
6+
7+
from django.db import connections
8+
import djclick as click
9+
10+
from analytics.core.models import JobFact
11+
from analytics.job_processor.utils import get_gitlab_handle
12+
13+
# The URL of the webhook handler service specified in the GitLab project settings.
14+
# This is the URL in the web_hook_logs table in the GitLab DB.
15+
WEBHOOK_URL = "http://webhook-handler.custom.svc.cluster.local"
16+
17+
18+
@dataclass
19+
class WebhookEvent:
20+
created_at: str
21+
build_id: int
22+
project_id: int
23+
webhook_id: int
24+
webhook_event_id: int
25+
26+
def __str__(self) -> str:
27+
return f"[{self.created_at}] build_id: {self.build_id}, project_id: {self.project_id}, webhook_id: {self.webhook_id}, webhook_event_id: {self.webhook_event_id}"
28+
29+
30+
def retry_webhook(webhook_event: WebhookEvent, dry_run: bool) -> None:
31+
if dry_run:
32+
click.echo(f"Would retry webhook {webhook_event}")
33+
return
34+
35+
click.echo(f"Retrying webhook {webhook_event}")
36+
gl = get_gitlab_handle()
37+
38+
# https://docs.gitlab.com/ee/api/project_webhooks.html#resend-a-project-webhook-event
39+
retry_url = f"/projects/{webhook_event.project_id}/hooks/{webhook_event.webhook_id}/events/{webhook_event.webhook_event_id}/resend"
40+
gl.http_post(retry_url)
41+
42+
43+
@click.command()
44+
@click.option(
45+
"--seconds",
46+
type=int,
47+
default=timedelta(days=1).total_seconds(),
48+
help="Retry webhooks that failed in the last N seconds",
49+
)
50+
@click.option(
51+
"--dry-run",
52+
is_flag=True,
53+
default=False,
54+
help="Print the webhooks that would be retried without actually retrying them",
55+
)
56+
def retry_failed_job_webhooks(seconds: int, dry_run: bool) -> None:
57+
with connections["gitlab"].cursor() as cursor:
58+
cursor.execute("BEGIN;")
59+
60+
cursor.execute(
61+
"""
62+
DECLARE
63+
webhook_cursor
64+
CURSOR FOR
65+
SELECT
66+
created_at,
67+
request_data,
68+
web_hook_id,
69+
id
70+
FROM
71+
public.web_hook_logs
72+
WHERE
73+
url = %s AND
74+
created_at > NOW() - INTERVAL %s;
75+
""",
76+
[WEBHOOK_URL, f"{seconds} seconds"],
77+
)
78+
79+
futures: list[Future] = []
80+
81+
with ThreadPoolExecutor() as executor:
82+
while True:
83+
# Fetch a batch of rows from the cursor
84+
cursor.execute("FETCH FORWARD %s FROM webhook_cursor", [5000])
85+
rows = cursor.fetchall()
86+
if not rows:
87+
break
88+
89+
webhook_events = [
90+
WebhookEvent(
91+
created_at=row[0],
92+
build_id=int(re.search(r"build_id: (\d+)", row[1]).group(1)),
93+
project_id=int(re.search(r"project_id: (\d+)", row[1]).group(1)),
94+
webhook_id=row[2],
95+
webhook_event_id=row[3],
96+
)
97+
for row in rows
98+
]
99+
100+
# We only want to retry webhooks for builds that have finished (i.e.
101+
# status is 'success' or 'failed'). Skipped or cancelled builds are
102+
# not stored in the analytics DB.
103+
cursor.execute(
104+
"""
105+
SELECT
106+
id
107+
FROM
108+
ci_builds
109+
WHERE
110+
id IN %s AND
111+
status IN ('success', 'failed');
112+
""",
113+
[tuple(event.build_id for event in webhook_events)],
114+
)
115+
finished_jobs: set[int] = set(itertools.chain.from_iterable(cursor.fetchall()))
116+
117+
# Build a mapping of build ID to webhook event object for fast lookup by build ID
118+
build_id_to_webhook_mapping: dict[int, WebhookEvent] = {
119+
event.build_id: event
120+
for event in webhook_events
121+
if event.build_id in finished_jobs
122+
}
123+
124+
# Collect all build IDs
125+
build_ids: set[int] = set(build_id_to_webhook_mapping.keys())
126+
127+
# Filter out build IDs that already have a corresponding analytics DB record
128+
existing_build_ids: set[int] = set(
129+
JobFact.objects.filter(job_id__in=build_ids).values_list("job_id", flat=True)
130+
)
131+
132+
# Calculate the missing build IDs
133+
missing_build_ids: set[int] = build_ids - existing_build_ids
134+
135+
# Retry the webhooks for the missing build IDs
136+
for build_id in missing_build_ids:
137+
futures.append(
138+
executor.submit(
139+
retry_webhook, build_id_to_webhook_mapping[build_id], dry_run
140+
)
141+
)
142+
143+
cursor.execute("CLOSE webhook_cursor;")
144+
cursor.execute("COMMIT;")
145+
146+
wait(futures)

0 commit comments

Comments
 (0)