From 0501dc0ce7a914de3b39acab3595b1b9cc0708e2 Mon Sep 17 00:00:00 2001
From: James Graham <J.Graham@software.ac.uk>
Date: Mon, 29 Apr 2019 15:48:58 +0100
Subject: [PATCH] Pipeline stage to validate JSON against schema - #73

---
 api/views/datasources.py         |  2 +-
 datasources/admin.py             | 20 ++++++++++++++++++++
 datasources/models/datasource.py |  8 ++++++--
 datasources/models/metadata.py   |  1 +
 datasources/models/pipeline.py   | 18 ++++++++++++------
 datasources/pipeline/base.py     | 22 ++++++++++++++++++++--
 6 files changed, 60 insertions(+), 11 deletions(-)

diff --git a/api/views/datasources.py b/api/views/datasources.py
index 137a4cf..efe38df 100644
--- a/api/views/datasources.py
+++ b/api/views/datasources.py
@@ -253,7 +253,7 @@ class DataSourceApiViewset(viewsets.ReadOnlyModelViewSet):
                     status=200
                 )
 
-            except models.pipeline.BasePipelineError as e:
+            except (models.pipeline.PipelineRuntimeError, models.pipeline.PipelineSetupError) as e:
                 return JsonResponse({
                     'status': 'error',
                     'message': str(e),
diff --git a/datasources/admin.py b/datasources/admin.py
index c6a9c6b..41eaef0 100644
--- a/datasources/admin.py
+++ b/datasources/admin.py
@@ -8,10 +8,30 @@ class MetadataFieldAdmin(admin.ModelAdmin):
     pass
 
 
+@admin.register(models.MetadataItem)
+class MetadataItemAdmin(admin.ModelAdmin):
+    pass
+
+
+class MetadataItemInline(admin.TabularInline):
+    model = models.MetadataItem
+    readonly_fields = ('field', 'value',)
+    extra = 0
+
+    def has_add_permission(self, request):
+        return False
+
+    def has_delete_permission(self, request, obj=None):
+        return False
+
+
 @admin.register(models.DataSource)
 class DataSourceAdmin(admin.ModelAdmin):
     readonly_fields = ['owner']
     form = forms.DataSourceForm
+    inlines = [
+        MetadataItemInline,
+    ]
 
     def has_change_permission(self, request, obj=None) -> bool:
         """
diff --git a/datasources/models/datasource.py b/datasources/models/datasource.py
index e6b66f5..d0b062d 100644
--- a/datasources/models/datasource.py
+++ b/datasources/models/datasource.py
@@ -373,8 +373,12 @@ class DataSource(ProvAbleModel, BaseAppDataModel):
 
             data = data_connector.get_data(params=params)
 
-        return self.pipeline(data)
-
+        return self.pipeline(
+            data,
+            options={
+                item.field.short_name: item.value for item in self.metadata_items.all()
+            }
+        )
 
     @property
     def search_representation(self) -> str:
diff --git a/datasources/models/metadata.py b/datasources/models/metadata.py
index b018132..156ba5c 100644
--- a/datasources/models/metadata.py
+++ b/datasources/models/metadata.py
@@ -56,6 +56,7 @@ class MetadataField(models.Model):
         fixtures = (
             ('data_query_param', 'data_query_param', True),
             ('indexed_field', 'indexed_field', True),
+            ('schema', 'schema', True),
         )
 
         for name, short_name, operational in fixtures:
diff --git a/datasources/models/pipeline.py b/datasources/models/pipeline.py
index e6bbd22..fa9c23c 100644
--- a/datasources/models/pipeline.py
+++ b/datasources/models/pipeline.py
@@ -7,11 +7,15 @@ from core.models import MAX_LENGTH_NAME
 from ..pipeline.base import BasePipelineStage
 
 
-class BasePipelineError(BaseException):
+class PipelineRuntimeError(BaseException):
     pass
 
 
-class PipelineValidationError(BasePipelineError):
+class PipelineValidationError(PipelineRuntimeError):
+    pass
+
+
+class PipelineSetupError(BaseException):
     pass
 
 
@@ -35,7 +39,8 @@ class Pipeline(models.Model):
     name = models.CharField(max_length=MAX_LENGTH_NAME,
                             blank=False, null=False)
 
-    def __call__(self, data: typing.Mapping, *args, **kwargs) -> typing.Mapping:
+    def __call__(self, data: typing.Mapping,
+                 options: typing.Optional[typing.Mapping] = None) -> typing.Mapping:
         """
         Run data through this pipeline.
 
@@ -43,7 +48,7 @@ class Pipeline(models.Model):
         :return: Processed data
         """
         for stage in self.stages.all():
-            data = stage(data)
+            data = stage(data, options=options)
 
         return data
 
@@ -68,7 +73,8 @@ class PipelineStage(models.Model):
                                  related_name='stages',
                                  blank=False, null=False)
 
-    def __call__(self, data: typing.Mapping, *args, **kwargs) -> typing.Mapping:
+    def __call__(self, data: typing.Mapping,
+                 options: typing.Optional[typing.Mapping] = None) -> typing.Mapping:
         """
         Run data through this pipeline stage.
 
@@ -79,7 +85,7 @@ class PipelineStage(models.Model):
 
         plugin = BasePipelineStage.get_plugin(self.plugin_name)
 
-        return plugin()(data)
+        return plugin(options=options)(data)
 
     def __str__(self):
         return self.plugin_name
diff --git a/datasources/pipeline/base.py b/datasources/pipeline/base.py
index 0ab6e53..205d885 100644
--- a/datasources/pipeline/base.py
+++ b/datasources/pipeline/base.py
@@ -3,8 +3,11 @@ This module contains classes required to build a data pipeline from a series of
 """
 
 import abc
+import json
 import typing
 
+import jsonschema
+
 from core import plugin
 from .. import models
 
@@ -13,7 +16,7 @@ class BasePipelineStage(metaclass=plugin.Plugin):
     #: Help string to be shown when a user is building a pipeline
     description = None
 
-    def __init__(self, *args, **kwargs):
+    def __init__(self, options: typing.Optional[typing.Mapping] = None):
         pass
 
     @abc.abstractmethod
@@ -39,5 +42,20 @@ class JsonValidationPipelineStage(BasePipelineStage):
     #: Help string to be shown when a user is building a pipeline
     description = 'Raise an error'
 
+    def __init__(self, options: typing.Optional[typing.Mapping] = None):
+        super().__init__(options)
+
+        try:
+            self.schema = options['schema']
+
+        except KeyError as e:
+            raise models.pipeline.PipelineSetupError('Schema has not been defined') from e
+
     def __call__(self, data: typing.Mapping) -> typing.Mapping:
-        raise models.pipeline.PipelineValidationError('Data failed validation')
+        try:
+            jsonschema.validate(data, json.loads(self.schema))
+
+        except jsonschema.ValidationError as e:
+            raise models.pipeline.PipelineValidationError('Failed validation stage: ' + str(e))
+
+        return data
-- 
GitLab