Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions samples/scripts/pushNotifications/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Pub/Sub Bucket Notifications - Local Testing Plan

This folder contains scripts to locally test DIVE's bucket notification handling as implemented by the `bucket_notifications` plugin endpoints (`/api/v1/bucket_notifications/gcs` and routing API).

It provides:

- A MinIO setup script to stand up a local S3-compatible endpoint, create a bucket, and seed sample data (based on `samples/scripts/assetStoreImport/minIOConfig.py`).
- A script to modify bucket contents to simulate new data arriving (file uploads/overwrites/deletes) that should trigger re-imports when wired to notifications.
- A script to send mock GCS Pub/Sub push messages to the server endpoint for end-to-end testing without GCP.

## Background

See `docs/Deployment-Storage.md` → "S3 and MinIO Mirroring" and "Pub/Sub notifications". The server exposes:

- `POST /api/v1/bucket_notifications/gcs` for receiving push-delivered Pub/Sub messages. On OBJECT_FINALIZE, the server will locate a matching read-only S3/GCS assetstore and re-import the relevant path beneath the configured mount folder.
- `POST /api/v1/bucket_notifications/routing/:id` for configuring `notification_router_rules` on an assetstore (folder mount root).

Relevant server code: `server/bucket_notifications/views.py` and `server/bucket_notifications/models.py`.

## Prerequisites

- Docker installed and running
- Python 3.8+
- DIVE backend running and reachable (e.g., http://localhost:8010)
- An S3/MinIO assetstore configured in Girder Admin pointing to the MinIO bucket created by these scripts:
- Type: Amazon S3
- Service: `http://<minio-host>:9000` (use actual IP printed by setup script)
- Bucket: `dive-sample-data`
- Read only: checked
- Region: `us-east-1` (or any string)
- Access Key / Secret Key: values printed by setup script

After creating the assetstore, set notification routing via:

```bash
curl -X POST \
-H "Girder-Token: <admin_token>" \
-H "Content-Type: application/json" \
http://localhost:8010/api/v1/bucket_notifications/routing/<assetstore_id> \
-d '{"data": {"folderId": "<mount_root_folder_id>"}}'
```

## Scripts

1) setup_minio.py

Launches MinIO and seeds data into bucket `dive-sample-data`. Prints MinIO IP and test credentials. Adapted from `samples/scripts/assetStoreImport/minIOConfig.py`.

2) modify_bucket.py

Performs object operations to simulate new data arrival:
- Upload a new folder with images
- Overwrite an existing object
- Optionally delete an object

3) send_gcs_push.py

Sends a mock GCS Pub/Sub push payload to DIVE at `/api/v1/bucket_notifications/gcs` with `eventType=OBJECT_FINALIZE` targeting a given `bucketId` and `objectId`.

## End-to-End Test Flow

1. Run setup_minio.py to start MinIO and seed data.
2. Configure an S3 assetstore in Girder to point to the printed MinIO service and bucket.
3. Configure notification routing on that assetstore to point at your chosen mount root folder.
4. Run modify_bucket.py to upload new files or folders into the MinIO bucket.
5. For local-only testing, run send_gcs_push.py with matching bucket and object path to simulate Pub/Sub push. Example:

```bash
uv run --script setup_minio.py -d ./sample

uv run --script modify_bucket.py upload \
--bucket dive-sample-data \
--prefix new-sequence/ \
--local-path ./newData

uv run --script send_gcs_push.py \
--server http://localhost:8010 \
--bucket dive-sample-data \
--object "new-sequence/"
```

Check the DIVE UI: new datasets should appear or update under the configured mount folder as the server processes the import triggered by the notification.

## Notes

- The server only handles `OBJECT_FINALIZE` notifications for automatic imports.
- For true GCP testing, you can configure a Pub/Sub push subscription to `https://<server>/api/v1/bucket_notifications/gcs` as described in `docs/Deployment-Storage.md`.


114 changes: 114 additions & 0 deletions samples/scripts/pushNotifications/modify_bucket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.8"
# dependencies = [
# "click",
# ]
# ///
import subprocess
import click
import os
import uuid


def mc_cmd(*args, capture_output=False):
"""Run mc command inside the minio_client container."""
result = subprocess.run(
["docker", "exec", "minio_client", "mc", *args],
check=True,
text=True,
capture_output=capture_output,
)
return result.stdout if capture_output else None


def docker_exec_mkdir(container_path):
"""Ensure a directory exists inside the container."""
subprocess.run(
["docker", "exec", "minio_client", "mkdir", "-p", container_path],
check=True,
)


def docker_cp_to_container(local_path, container_path):
"""Copy a local file or directory into the minio_client container."""
container_dir = os.path.dirname(container_path)
docker_exec_mkdir(container_dir)
subprocess.run(
["docker", "cp", local_path, f"minio_client:{container_path}"],
check=True,
)


def docker_exec_rm(container_path):
"""Remove temporary files/folders from inside the container."""
subprocess.run(
["docker", "exec", "minio_client", "rm", "-rf", container_path],
check=False,
)


@click.group()
def cli():
"""CLI for managing MinIO bucket objects."""
pass


@cli.command()
@click.option("--bucket", required=True, help="Bucket name, e.g., dive-sample-data")
@click.option("--prefix", required=True, help="Destination prefix inside the bucket")
@click.option("--local-path", required=True, type=click.Path(exists=True), help="Local folder or file to upload")
def upload(bucket, prefix, local_path):
"""Upload file/folder to bucket/prefix."""
temp_id = uuid.uuid4().hex
container_temp_path = f"/tmp/upload_{temp_id}"
container_upload_path = os.path.join(container_temp_path, os.path.basename(local_path))

click.echo(f"Copying {local_path} into container...")
docker_exec_mkdir(container_temp_path)
docker_cp_to_container(local_path, container_upload_path)

try:
click.echo("Uploading to MinIO...")
mc_cmd("cp", "-r", container_upload_path, f"local/{bucket}/{prefix}")
click.echo("✅ Upload complete")
finally:
click.echo("Cleaning up temporary files...")
docker_exec_rm(container_temp_path)


@cli.command()
@click.option("--bucket", required=True)
@click.option("--object", "object_path", required=True, help="Object path inside bucket to overwrite")
@click.option("--local-file", required=True, type=click.Path(exists=True))
def overwrite(bucket, object_path, local_file):
"""Overwrite a single object with a local file."""
temp_id = uuid.uuid4().hex
container_temp_dir = f"/tmp/overwrite_{temp_id}"
container_temp_file = os.path.join(container_temp_dir, os.path.basename(local_file))

click.echo(f"Copying {local_file} into container...")
docker_exec_mkdir(container_temp_dir)
docker_cp_to_container(local_file, container_temp_file)

try:
click.echo("Overwriting object in MinIO...")
mc_cmd("cp", container_temp_file, f"local/{bucket}/{object_path}")
click.echo("✅ Overwrite complete")
finally:
click.echo("Cleaning up temporary files...")
docker_exec_rm(container_temp_dir)


@cli.command()
@click.option("--bucket", required=True)
@click.option("--object", "object_path", required=True)
def delete(bucket, object_path):
"""Delete an object from the bucket."""
click.echo(f"Deleting object: {bucket}/{object_path}...")
mc_cmd("rm", f"local/{bucket}/{object_path}")
click.echo("✅ Delete complete")


if __name__ == "__main__":
cli()
51 changes: 51 additions & 0 deletions samples/scripts/pushNotifications/send_gcs_push.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.8"
# dependencies = [
# "click",
# "requests",
# ]
# ///
import click
import requests
import json
from datetime import datetime, timezone
import base64
import uuid


def build_payload(bucket_id: str, object_id: str, event_type: str = "OBJECT_FINALIZE") -> dict:
now = datetime.now(timezone.utc).isoformat()
fake_data = base64.b64encode(b"{}\n").decode("utf-8")
return {
"message": {
"attributes": {
"bucketId": bucket_id,
"objectId": object_id,
"eventType": event_type,
},
"data": fake_data,
"messageId": str(uuid.uuid4()),
"publishTime": now,
},
"subscription": "local-testing",
}


@click.command()
@click.option("--server", default="http://localhost:8010", show_default=True, help="DIVE server base URL")
@click.option("--bucket", default="dive-sample-data", required=True, help="Bucket name as configured in assetstore")
@click.option("--object", "object_path", required=True, help="Object path that was finalized")
def main(server, bucket, object_path):
url = f"{server}/api/v1/bucket_notifications/gcs"
payload = build_payload(bucket, object_path)
resp = requests.post(url, json={"message": payload["message"], "subscription": payload["subscription"]})
click.echo(f"POST {url} -> {resp.status_code}")
if resp.status_code >= 400:
click.echo(resp.text)


if __name__ == "__main__":
main()


Loading
Loading