Skip to content
Snippets Groups Projects
Commit 0501dc0c authored by James Graham's avatar James Graham
Browse files

Pipeline stage to validate JSON against schema - #73

parent 382ba69b
No related branches found
No related tags found
No related merge requests found
......@@ -253,7 +253,7 @@ class DataSourceApiViewset(viewsets.ReadOnlyModelViewSet):
status=200
)
except models.pipeline.BasePipelineError as e:
except (models.pipeline.PipelineRuntimeError, models.pipeline.PipelineSetupError) as e:
return JsonResponse({
'status': 'error',
'message': str(e),
......
......@@ -8,10 +8,30 @@ class MetadataFieldAdmin(admin.ModelAdmin):
pass
@admin.register(models.MetadataItem)
class MetadataItemAdmin(admin.ModelAdmin):
pass
class MetadataItemInline(admin.TabularInline):
model = models.MetadataItem
readonly_fields = ('field', 'value',)
extra = 0
def has_add_permission(self, request):
return False
def has_delete_permission(self, request, obj=None):
return False
@admin.register(models.DataSource)
class DataSourceAdmin(admin.ModelAdmin):
readonly_fields = ['owner']
form = forms.DataSourceForm
inlines = [
MetadataItemInline,
]
def has_change_permission(self, request, obj=None) -> bool:
"""
......
......@@ -373,8 +373,12 @@ class DataSource(ProvAbleModel, BaseAppDataModel):
data = data_connector.get_data(params=params)
return self.pipeline(data)
return self.pipeline(
data,
options={
item.field.short_name: item.value for item in self.metadata_items.all()
}
)
@property
def search_representation(self) -> str:
......
......@@ -56,6 +56,7 @@ class MetadataField(models.Model):
fixtures = (
('data_query_param', 'data_query_param', True),
('indexed_field', 'indexed_field', True),
('schema', 'schema', True),
)
for name, short_name, operational in fixtures:
......
......@@ -7,11 +7,15 @@ from core.models import MAX_LENGTH_NAME
from ..pipeline.base import BasePipelineStage
class BasePipelineError(BaseException):
class PipelineRuntimeError(BaseException):
pass
class PipelineValidationError(BasePipelineError):
class PipelineValidationError(PipelineRuntimeError):
pass
class PipelineSetupError(BaseException):
pass
......@@ -35,7 +39,8 @@ class Pipeline(models.Model):
name = models.CharField(max_length=MAX_LENGTH_NAME,
blank=False, null=False)
def __call__(self, data: typing.Mapping, *args, **kwargs) -> typing.Mapping:
def __call__(self, data: typing.Mapping,
options: typing.Optional[typing.Mapping] = None) -> typing.Mapping:
"""
Run data through this pipeline.
......@@ -43,7 +48,7 @@ class Pipeline(models.Model):
:return: Processed data
"""
for stage in self.stages.all():
data = stage(data)
data = stage(data, options=options)
return data
......@@ -68,7 +73,8 @@ class PipelineStage(models.Model):
related_name='stages',
blank=False, null=False)
def __call__(self, data: typing.Mapping, *args, **kwargs) -> typing.Mapping:
def __call__(self, data: typing.Mapping,
options: typing.Optional[typing.Mapping] = None) -> typing.Mapping:
"""
Run data through this pipeline stage.
......@@ -79,7 +85,7 @@ class PipelineStage(models.Model):
plugin = BasePipelineStage.get_plugin(self.plugin_name)
return plugin()(data)
return plugin(options=options)(data)
def __str__(self):
return self.plugin_name
......@@ -3,8 +3,11 @@ This module contains classes required to build a data pipeline from a series of
"""
import abc
import json
import typing
import jsonschema
from core import plugin
from .. import models
......@@ -13,7 +16,7 @@ class BasePipelineStage(metaclass=plugin.Plugin):
#: Help string to be shown when a user is building a pipeline
description = None
def __init__(self, *args, **kwargs):
def __init__(self, options: typing.Optional[typing.Mapping] = None):
pass
@abc.abstractmethod
......@@ -39,5 +42,20 @@ class JsonValidationPipelineStage(BasePipelineStage):
#: Help string to be shown when a user is building a pipeline
description = 'Raise an error'
def __init__(self, options: typing.Optional[typing.Mapping] = None):
super().__init__(options)
try:
self.schema = options['schema']
except KeyError as e:
raise models.pipeline.PipelineSetupError('Schema has not been defined') from e
def __call__(self, data: typing.Mapping) -> typing.Mapping:
raise models.pipeline.PipelineValidationError('Data failed validation')
try:
jsonschema.validate(data, json.loads(self.schema))
except jsonschema.ValidationError as e:
raise models.pipeline.PipelineValidationError('Failed validation stage: ' + str(e))
return data
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment