Skip to content

Commit 7f4e330

Browse files
committed
add a script to feed fastpath from failed measurements s3 bucket
1 parent 22d2544 commit 7f4e330

1 file changed

Lines changed: 155 additions & 0 deletions

File tree

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
#!/usr/bin/env python3
2+
"""
3+
List objects in an S3 bucket using boto3.
4+
Configuration is read from environment variables (see defaults below).
5+
"""
6+
7+
import os
8+
import boto3
9+
import json
10+
import requests
11+
from botocore.exceptions import ClientError, NoCredentialsError, EndpointConnectionError
12+
from concurrent.futures import ThreadPoolExecutor, as_completed
13+
from pathlib import Path
14+
15+
# Configuration from environment (set these in your shell)
16+
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") # required if not using IAM role/profile
17+
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") # required if not using IAM role/profile
18+
ROLE_ARN = os.getenv("ROLE_ARN")
19+
ROLE_SESSION_NAME = os.getenv("ROLE_SESSION_NAME", "assume-role-session")
20+
ROLE_DURATION_SECONDS = int(os.getenv("ROLE_DURATION_SECONDS", "3600")) # optional
21+
AWS_REGION = os.getenv("AWS_REGION", "eu-central-1")
22+
BUCKET_NAME = os.getenv("S3_BUCKET_NAME") # required
23+
PREFIX = os.getenv("S3_PREFIX", "")
24+
MAX_KEYS = int(os.getenv("S3_MAX_KEYS", "1000"))
25+
DEST_ROOT = os.getenv("DOWNLOAD_ROOT", "./s3-downloads")
26+
FASTPATH_API = os.getenv("FASTPATH_API", "")
27+
28+
def assume_role_and_get_credentials(role_arn, session_name, duration_seconds=3600):
29+
"""
30+
Assume the given role and return temporary credentials dict:
31+
{ aws_access_key_id, aws_secret_access_key, aws_session_token }
32+
"""
33+
# Use provided long-term creds or default chain to call STS
34+
sts_kwargs = {"region_name": AWS_REGION,
35+
"aws_access_key_id": AWS_ACCESS_KEY_ID,
36+
"aws_secret_access_key": AWS_SECRET_ACCESS_KEY,
37+
}
38+
sts_client = boto3.client("sts", **sts_kwargs)
39+
resp = sts_client.assume_role(
40+
RoleArn=role_arn,
41+
RoleSessionName=session_name,
42+
DurationSeconds=duration_seconds
43+
)
44+
creds = resp["Credentials"]
45+
return {
46+
"aws_access_key_id": creds["AccessKeyId"],
47+
"aws_secret_access_key": creds["SecretAccessKey"],
48+
"aws_session_token": creds["SessionToken"],
49+
}
50+
51+
def get_s3_client():
52+
"""
53+
Returns an S3 client. If ROLE_ARN is set, assumes that role first and uses
54+
the temporary credentials. Otherwise uses provided credentials or default chain.
55+
"""
56+
client_kwargs = {"region_name": AWS_REGION}
57+
if ROLE_ARN:
58+
try:
59+
temp = assume_role_and_get_credentials(ROLE_ARN, ROLE_SESSION_NAME, ROLE_DURATION_SECONDS)
60+
client_kwargs.update(temp)
61+
except ClientError as e:
62+
print(f"Error assuming role: {e.response.get('Error', {}).get('Message')}")
63+
raise
64+
else:
65+
if AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY:
66+
client_kwargs.update({
67+
"aws_access_key_id": AWS_ACCESS_KEY_ID,
68+
"aws_secret_access_key": AWS_SECRET_ACCESS_KEY,
69+
})
70+
if AWS_SESSION_TOKEN:
71+
client_kwargs["aws_session_token"] = AWS_SESSION_TOKEN
72+
return boto3.client("s3", **client_kwargs)
73+
74+
def walk(s3, bucket_name, start_prefix=''):
75+
"""
76+
Generator like os.walk:
77+
yields (prefix, subprefixes, objects)
78+
- prefix: current prefix ('' or ending with '/')
79+
- subprefixes: list of child prefixes (each ends with '/')
80+
- objects: list of object keys directly under this prefix (no trailing '/')
81+
"""
82+
paginator = s3.get_paginator("list_objects_v2")
83+
page_iter = paginator.paginate(Bucket=bucket_name, Prefix=start_prefix, Delimiter='/')
84+
subprefixes = []
85+
objects = []
86+
for page in page_iter:
87+
subprefixes.extend([cp["Prefix"] for cp in page.get("CommonPrefixes", [])])
88+
for obj in page.get("Contents", []):
89+
key = obj["Key"]
90+
if key == start_prefix:
91+
continue
92+
objects.append(key)
93+
yield start_prefix, subprefixes, objects
94+
for sub in subprefixes:
95+
yield from walk(s3, bucket_name, sub)
96+
97+
def safe_local_path(prefix, key):
98+
# turn S3 key into a local path under DEST_ROOT preserving prefix structure
99+
rel = key[len(prefix):] if prefix and key.startswith(prefix) else key
100+
return os.path.join(DEST_ROOT, prefix.replace('/', os.sep), rel.replace('/', os.sep))
101+
102+
def ensure_parent(path):
103+
os.makedirs(os.path.dirname(path), exist_ok=True)
104+
105+
def process_postcan(s3, bucket, key, local_path):
106+
try:
107+
print("Downloading", key)
108+
s3.download_file(bucket, key, local_path)
109+
p = Path(local_path)
110+
msmt_id = p.stem
111+
with p.open("r", encoding="utf-8") as f:
112+
data = json.load(f)
113+
assert data['format'] == 'json'
114+
content = data.get('content')
115+
endpoint = f"{FASTPATH_API}/{msmt_id}"
116+
try:
117+
resp = requests.post(endpoint, json=content, timeout=30)
118+
resp.raise_for_status()
119+
except requests.RequestException:
120+
raise
121+
assert resp.status_code == 200
122+
assert resp.content == b""
123+
# XXX: remove file from s3 if everything went OK
124+
return key, None
125+
except Exception as e:
126+
try:
127+
if os.path.exists(local_path):
128+
os.remove(local_path)
129+
except Exception as remove_err:
130+
return key, f"remove-failed: {remove_err}; download-failed: {e}"
131+
return key, str(e)
132+
133+
def main():
134+
if not BUCKET_NAME:
135+
print("S3_BUCKET_NAME environment variable is required.")
136+
return
137+
s3 = get_s3_client()
138+
for prefix, subs, objs in walk(s3, BUCKET_NAME, ""):
139+
print(f"PREFIX: {prefix} subdirs={len(subs)} objects={len(objs)}")
140+
with ThreadPoolExecutor(max_workers=50) as _exe:
141+
futures = []
142+
for key in objs:
143+
local_path = safe_local_path(prefix, key)
144+
ensure_parent(local_path)
145+
futures.append(_exe.submit(process_postcan, s3, BUCKET_NAME, key, local_path))
146+
147+
for fut in as_completed(futures):
148+
key, err = fut.result()
149+
if err:
150+
print(f"Failed to process {key}: {err}")
151+
else:
152+
print(f"Submitted {key} to fastpath")
153+
154+
if __name__ == "__main__":
155+
main()

0 commit comments

Comments
 (0)