Skip to content

Commit 94ae782

Browse files
authored
ogc api processes subscriber (#1313)
* Exclude None from `get_processor` return type annotation An exception is raised in case of error, so it can't ever return None * Add support for OGC API Processes Subscriber The subscription URLs are passed to the manager, which then has to call them appropriately. By default, managers have the attribute `supports_subscribing` set to `False` in order to not break the API for these. The subscriptions are only passed to if this is set to `True` * Add ogc api callback class to conformance https://docs.ogc.org/is/18-062r2/18-062r2.html#toc67 * Make successUri mandatory in subscriber It's mandatory in the standard. Thx @ricardogsilva ! * Use snake case in python for fields which are camel case in the api Thx @ricardogsilva ! * Add subscriber to method docstring * Provide default value for subscriber for managers not supporting it Thanks @ricardogsilva ! * Factor out notification call into methods This increases reusability by other managers Thanks @ricardogsilva ! * Add an example call for a process subscriber * Change test urls to valid urls * Third party imports in own block
1 parent ab4fe09 commit 94ae782

File tree

7 files changed

+122
-8
lines changed

7 files changed

+122
-8
lines changed

docs/source/data-publishing/ogcapi-processes.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,12 @@ Processing examples
108108
-H "Prefer: respond-async"
109109
-d "{\"inputs\":{\"name\": \"hi there2\"}}"
110110
111+
# execute a job for the ``hello-world`` process with a success subscriber
112+
curl -X POST http://localhost:5000/processes/hello-world/execution \
113+
-H "Content-Type: application/json" \
114+
-d "{\"inputs\":{\"name\": \"hi there2\"}, \
115+
\"subscriber\": {\"successUri\": \"https://www.example.com/success\"}}"
116+
111117
112118
.. _`OGC API - Processes`: https://ogcapi.ogc.org/processes
113119
.. _`sample`: https://github.com/geopython/pygeoapi/blob/master/pygeoapi/process/hello_world.py

pygeoapi/api.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@
8686
TEMPLATES, to_json, get_api_rules, get_base_url,
8787
get_crs_from_uri, get_supported_crs_list,
8888
modify_pygeofilter, CrsTransformSpec,
89-
transform_bbox)
89+
transform_bbox, Subscriber)
9090

9191
LOGGER = logging.getLogger(__name__)
9292

@@ -174,6 +174,7 @@
174174
'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/core',
175175
'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/json',
176176
'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/oas30'
177+
'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/callback',
177178
],
178179
'edr': [
179180
'http://www.opengis.net/spec/ogcapi-edr-1/1.0/conf/core'
@@ -3492,6 +3493,23 @@ def execute_process(self, request: Union[APIRequest, Any],
34923493
data_dict = data.get('inputs', {})
34933494
LOGGER.debug(data_dict)
34943495

3496+
subscriber = None
3497+
subscriber_dict = data.get('subscriber')
3498+
if subscriber_dict:
3499+
try:
3500+
success_uri = subscriber_dict['successUri']
3501+
except KeyError:
3502+
return self.get_exception(
3503+
HTTPStatus.BAD_REQUEST, headers, request.format,
3504+
'MissingParameterValue', 'Missing successUri')
3505+
else:
3506+
subscriber = Subscriber(
3507+
# NOTE: successUri is mandatory according to the standard
3508+
success_uri=success_uri,
3509+
in_progress_uri=subscriber_dict.get('inProgressUri'),
3510+
failed_uri=subscriber_dict.get('failedUri'),
3511+
)
3512+
34953513
try:
34963514
execution_mode = RequestedProcessExecutionMode(
34973515
request.headers.get('Prefer', request.headers.get('prefer'))
@@ -3501,7 +3519,11 @@ def execute_process(self, request: Union[APIRequest, Any],
35013519
try:
35023520
LOGGER.debug('Executing process')
35033521
result = self.manager.execute_process(
3504-
process_id, data_dict, execution_mode=execution_mode)
3522+
process_id,
3523+
data_dict,
3524+
execution_mode=execution_mode,
3525+
subscriber=subscriber,
3526+
)
35053527
job_id, mime_type, outputs, status, additional_headers = result
35063528
headers.update(additional_headers or {})
35073529
headers['Location'] = f'{self.base_url}/jobs/{job_id}'

pygeoapi/process/manager/base.py

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
from typing import Any, Dict, Tuple, Optional, OrderedDict
3939
import uuid
4040

41+
import requests
42+
4143
from pygeoapi.plugin import load_plugin
4244
from pygeoapi.process.base import (
4345
BaseProcessor,
@@ -50,6 +52,7 @@
5052
JobStatus,
5153
ProcessExecutionMode,
5254
RequestedProcessExecutionMode,
55+
Subscriber,
5356
)
5457

5558
LOGGER = logging.getLogger(__name__)
@@ -70,6 +73,7 @@ def __init__(self, manager_def: dict):
7073

7174
self.name = manager_def['name']
7275
self.is_async = False
76+
self.supports_subscribing = False
7377
self.connection = manager_def.get('connection')
7478
self.output_dir = manager_def.get('output_dir')
7579

@@ -85,7 +89,7 @@ def __init__(self, manager_def: dict):
8589
for id_, process_conf in manager_def.get('processes', {}).items():
8690
self.processes[id_] = dict(process_conf)
8791

88-
def get_processor(self, process_id: str) -> Optional[BaseProcessor]:
92+
def get_processor(self, process_id: str) -> BaseProcessor:
8993
"""Instantiate a processor.
9094
9195
:param process_id: Identifier of the process
@@ -178,7 +182,9 @@ def delete_job(self, job_id: str) -> bool:
178182
raise JobNotFoundError()
179183

180184
def _execute_handler_async(self, p: BaseProcessor, job_id: str,
181-
data_dict: dict) -> Tuple[str, None, JobStatus]:
185+
data_dict: dict,
186+
subscriber: Optional[Subscriber] = None,
187+
) -> Tuple[str, None, JobStatus]:
182188
"""
183189
This private execution handler executes a process in a background
184190
thread using `multiprocessing.dummy`
@@ -194,13 +200,15 @@ def _execute_handler_async(self, p: BaseProcessor, job_id: str,
194200
"""
195201
_process = dummy.Process(
196202
target=self._execute_handler_sync,
197-
args=(p, job_id, data_dict)
203+
args=(p, job_id, data_dict, subscriber)
198204
)
199205
_process.start()
200206
return 'application/json', None, JobStatus.accepted
201207

202208
def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
203-
data_dict: dict) -> Tuple[str, Any, JobStatus]:
209+
data_dict: dict,
210+
subscriber: Optional[Subscriber] = None,
211+
) -> Tuple[str, Any, JobStatus]:
204212
"""
205213
Synchronous execution handler
206214
@@ -233,6 +241,7 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
233241
}
234242

235243
self.add_job(job_metadata)
244+
self._send_in_progress_notification(subscriber)
236245

237246
try:
238247
if self.output_dir is not None:
@@ -276,6 +285,7 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
276285
}
277286

278287
self.update_job(job_id, job_update_metadata)
288+
self._send_success_notification(subscriber, outputs=outputs)
279289

280290
except Exception as err:
281291
# TODO assess correct exception type and description to help users
@@ -308,13 +318,16 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
308318

309319
self.update_job(job_id, job_metadata)
310320

321+
self._send_failed_notification(subscriber)
322+
311323
return jfmt, outputs, current_status
312324

313325
def execute_process(
314326
self,
315327
process_id: str,
316328
data_dict: dict,
317-
execution_mode: Optional[RequestedProcessExecutionMode] = None
329+
execution_mode: Optional[RequestedProcessExecutionMode] = None,
330+
subscriber: Optional[Subscriber] = None,
318331
) -> Tuple[str, Any, JobStatus, Optional[Dict[str, str]]]:
319332
"""
320333
Default process execution handler
@@ -323,6 +336,7 @@ def execute_process(
323336
:param data_dict: `dict` of data parameters
324337
:param execution_mode: `str` optionally specifying sync or async
325338
processing.
339+
:param subscriber: `Subscriber` optionally specifying callback urls
326340
327341
:raises UnknownProcessError: if the input process_id does not
328342
correspond to a known process
@@ -367,9 +381,39 @@ def execute_process(
367381
response_headers = None
368382
# TODO: handler's response could also be allowed to include more HTTP
369383
# headers
370-
mime_type, outputs, status = handler(processor, job_id, data_dict)
384+
mime_type, outputs, status = handler(
385+
processor,
386+
job_id,
387+
data_dict,
388+
# only pass subscriber if supported, otherwise this breaks existing
389+
# managers
390+
**({'subscriber': subscriber} if self.supports_subscribing else {})
391+
)
371392
return job_id, mime_type, outputs, status, response_headers
372393

394+
def _send_in_progress_notification(self, subscriber: Optional[Subscriber]):
395+
if subscriber and subscriber.in_progress_uri:
396+
response = requests.post(subscriber.in_progress_uri, json={})
397+
LOGGER.debug(
398+
f'In progress notification response: {response.status_code}'
399+
)
400+
401+
def _send_success_notification(
402+
self, subscriber: Optional[Subscriber], outputs: Any
403+
):
404+
if subscriber:
405+
response = requests.post(subscriber.success_uri, json=outputs)
406+
LOGGER.debug(
407+
f'Success notification response: {response.status_code}'
408+
)
409+
410+
def _send_failed_notification(self, subscriber: Optional[Subscriber]):
411+
if subscriber and subscriber.failed_uri:
412+
response = requests.post(subscriber.failed_uri, json={})
413+
LOGGER.debug(
414+
f'Failed notification response: {response.status_code}'
415+
)
416+
373417
def __repr__(self):
374418
return f'<BaseManager> {self.name}'
375419

pygeoapi/process/manager/mongodb_.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class MongoDBManager(BaseManager):
4545
def __init__(self, manager_def):
4646
super().__init__(manager_def)
4747
self.is_async = True
48+
self.supports_subscribing = True
4849

4950
def _connect(self):
5051
try:

pygeoapi/process/manager/tinydb_.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ def __init__(self, manager_def: dict):
6161

6262
super().__init__(manager_def)
6363
self.is_async = True
64+
self.supports_subscribing = True
6465

6566
@contextmanager
6667
def _db(self):

pygeoapi/util.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,17 @@ class JobStatus(Enum):
591591
dismissed = 'dismissed'
592592

593593

594+
@dataclass(frozen=True)
595+
class Subscriber:
596+
"""Store subscriber urls as defined in:
597+
598+
https://schemas.opengis.net/ogcapi/processes/part1/1.0/openapi/schemas/subscriber.yaml # noqa
599+
"""
600+
success_uri: str
601+
in_progress_uri: Optional[str]
602+
failed_uri: Optional[str]
603+
604+
594605
def read_data(path: Union[Path, str]) -> Union[bytes, str]:
595606
"""
596607
helper function to read data (file or network)

tests/test_api.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import time
3737
import gzip
3838
from http import HTTPStatus
39+
from unittest import mock
3940

4041
from pyld import jsonld
4142
import pytest
@@ -1738,6 +1739,16 @@ def test_execute_process(config, api_):
17381739
'name': None
17391740
}
17401741
}
1742+
req_body_7 = {
1743+
'inputs': {
1744+
'name': 'Test'
1745+
},
1746+
'subscriber': {
1747+
'successUri': 'https://example.com/success',
1748+
'inProgressUri': 'https://example.com/inProgress',
1749+
'failedUri': 'https://example.com/failed',
1750+
}
1751+
}
17411752

17421753
cleanup_jobs = set()
17431754

@@ -1865,6 +1876,24 @@ def test_execute_process(config, api_):
18651876
assert isinstance(response, dict)
18661877
assert code == HTTPStatus.CREATED
18671878

1879+
cleanup_jobs.add(tuple(['hello-world',
1880+
rsp_headers['Location'].split('/')[-1]]))
1881+
1882+
req = mock_request(data=req_body_7)
1883+
with mock.patch(
1884+
'pygeoapi.process.manager.base.requests.post'
1885+
) as post_mocker:
1886+
rsp_headers, code, response = api_.execute_process(req, 'hello-world')
1887+
assert code == HTTPStatus.OK
1888+
post_mocker.assert_any_call(
1889+
req_body_7['subscriber']['inProgressUri'], json={}
1890+
)
1891+
post_mocker.assert_any_call(
1892+
req_body_7['subscriber']['successUri'],
1893+
json={'id': 'echo', 'value': 'Hello Test!'}
1894+
)
1895+
assert post_mocker.call_count == 2
1896+
18681897
cleanup_jobs.add(tuple(['hello-world',
18691898
rsp_headers['Location'].split('/')[-1]]))
18701899

0 commit comments

Comments
 (0)