diff --git a/api/views/datasources.py b/api/views/datasources.py
index 137a4cf61a431570114f994f14fa1983955014f8..efe38df9b06c52a391513589977d4d14fad39c66 100644
--- a/api/views/datasources.py
+++ b/api/views/datasources.py
@@ -253,7 +253,7 @@ class DataSourceApiViewset(viewsets.ReadOnlyModelViewSet):
                     status=200
                 )
 
-            except models.pipeline.BasePipelineError as e:
+            except (models.pipeline.PipelineRuntimeError, models.pipeline.PipelineSetupError) as e:
                 return JsonResponse({
                     'status': 'error',
                     'message': str(e),
diff --git a/datasources/admin.py b/datasources/admin.py
index c6a9c6ba6bcd44cdbf159527a5a254fdcd483795..41eaef05654691953c034ae67e0b622d6254a9a1 100644
--- a/datasources/admin.py
+++ b/datasources/admin.py
@@ -8,10 +8,30 @@ class MetadataFieldAdmin(admin.ModelAdmin):
     pass
 
 
+@admin.register(models.MetadataItem)
+class MetadataItemAdmin(admin.ModelAdmin):
+    pass
+
+
+class MetadataItemInline(admin.TabularInline):
+    model = models.MetadataItem
+    readonly_fields = ('field', 'value',)
+    extra = 0
+
+    def has_add_permission(self, request):
+        return False
+
+    def has_delete_permission(self, request, obj=None):
+        return False
+
+
 @admin.register(models.DataSource)
 class DataSourceAdmin(admin.ModelAdmin):
     readonly_fields = ['owner']
     form = forms.DataSourceForm
+    inlines = [
+        MetadataItemInline,
+    ]
 
     def has_change_permission(self, request, obj=None) -> bool:
         """
diff --git a/datasources/models/datasource.py b/datasources/models/datasource.py
index e6b66f57d468b31065c03e4fe9e133db47fdb222..d0b062d4e99c9b626658cf1e2c5fc7b8d38afad7 100644
--- a/datasources/models/datasource.py
+++ b/datasources/models/datasource.py
@@ -373,8 +373,12 @@ class DataSource(ProvAbleModel, BaseAppDataModel):
 
             data = data_connector.get_data(params=params)
 
-        return self.pipeline(data)
-
+        return self.pipeline(
+            data,
+            options={
+                item.field.short_name: item.value for item in self.metadata_items.all()
+            }
+        )
 
     @property
     def search_representation(self) -> str:
diff --git a/datasources/models/metadata.py b/datasources/models/metadata.py
index b018132ea274e67ba1457dd18a47bbdd5f1fb3ba..156ba5cbd56737538e00bf4c5583e149aafa3b07 100644
--- a/datasources/models/metadata.py
+++ b/datasources/models/metadata.py
@@ -56,6 +56,7 @@ class MetadataField(models.Model):
         fixtures = (
             ('data_query_param', 'data_query_param', True),
             ('indexed_field', 'indexed_field', True),
+            ('schema', 'schema', True),
         )
 
         for name, short_name, operational in fixtures:
diff --git a/datasources/models/pipeline.py b/datasources/models/pipeline.py
index e6bbd22de48d657f1ea39e26f986659535ac0a0d..fa9c23c81b88e30dcd47acd8576086aabc83d608 100644
--- a/datasources/models/pipeline.py
+++ b/datasources/models/pipeline.py
@@ -7,11 +7,15 @@ from core.models import MAX_LENGTH_NAME
 from ..pipeline.base import BasePipelineStage
 
 
-class BasePipelineError(BaseException):
+class PipelineRuntimeError(BaseException):
     pass
 
 
-class PipelineValidationError(BasePipelineError):
+class PipelineValidationError(PipelineRuntimeError):
+    pass
+
+
+class PipelineSetupError(BaseException):
     pass
 
 
@@ -35,7 +39,8 @@ class Pipeline(models.Model):
     name = models.CharField(max_length=MAX_LENGTH_NAME,
                             blank=False, null=False)
 
-    def __call__(self, data: typing.Mapping, *args, **kwargs) -> typing.Mapping:
+    def __call__(self, data: typing.Mapping,
+                 options: typing.Optional[typing.Mapping] = None) -> typing.Mapping:
         """
         Run data through this pipeline.
 
@@ -43,7 +48,7 @@ class Pipeline(models.Model):
         :return: Processed data
         """
         for stage in self.stages.all():
-            data = stage(data)
+            data = stage(data, options=options)
 
         return data
 
@@ -68,7 +73,8 @@ class PipelineStage(models.Model):
                                  related_name='stages',
                                  blank=False, null=False)
 
-    def __call__(self, data: typing.Mapping, *args, **kwargs) -> typing.Mapping:
+    def __call__(self, data: typing.Mapping,
+                 options: typing.Optional[typing.Mapping] = None) -> typing.Mapping:
         """
         Run data through this pipeline stage.
 
@@ -79,7 +85,7 @@ class PipelineStage(models.Model):
 
         plugin = BasePipelineStage.get_plugin(self.plugin_name)
 
-        return plugin()(data)
+        return plugin(options=options)(data)
 
     def __str__(self):
         return self.plugin_name
diff --git a/datasources/pipeline/base.py b/datasources/pipeline/base.py
index 0ab6e53adb674bdf883bcc2be4e1c6e23671d83e..205d8856cf017ceb74f0b7a7dd6140a9df4720b6 100644
--- a/datasources/pipeline/base.py
+++ b/datasources/pipeline/base.py
@@ -3,8 +3,11 @@ This module contains classes required to build a data pipeline from a series of
 """
 
 import abc
+import json
 import typing
 
+import jsonschema
+
 from core import plugin
 from .. import models
 
@@ -13,7 +16,7 @@ class BasePipelineStage(metaclass=plugin.Plugin):
     #: Help string to be shown when a user is building a pipeline
     description = None
 
-    def __init__(self, *args, **kwargs):
+    def __init__(self, options: typing.Optional[typing.Mapping] = None):
         pass
 
     @abc.abstractmethod
@@ -39,5 +42,20 @@ class JsonValidationPipelineStage(BasePipelineStage):
     #: Help string to be shown when a user is building a pipeline
     description = 'Raise an error'
 
+    def __init__(self, options: typing.Optional[typing.Mapping] = None):
+        super().__init__(options)
+
+        try:
+            self.schema = options['schema']
+
+        except KeyError as e:
+            raise models.pipeline.PipelineSetupError('Schema has not been defined') from e
+
     def __call__(self, data: typing.Mapping) -> typing.Mapping:
-        raise models.pipeline.PipelineValidationError('Data failed validation')
+        try:
+            jsonschema.validate(data, json.loads(self.schema))
+
+        except jsonschema.ValidationError as e:
+            raise models.pipeline.PipelineValidationError('Failed validation stage: ' + str(e))
+
+        return data