Skip to content
Snippets Groups Projects
Select Git revision
  • 498ece492015a5b0bc01b04b70e3862fbbabf562
  • master default protected
  • pipeline
  • dev
  • quality
  • docs
  • issue-65
  • issue-30
  • issue-19
  • issue-28
  • 0.1.1
  • 0.1.0
12 results

models.py

Blame
  • models.py 7.64 KiB
    import enum
    import json
    import typing
    
    from django.conf import settings
    from django.contrib.auth.models import Group
    from django.db import models
    from django.urls import reverse
    import requests
    import requests.exceptions
    
    from core.models import BaseAppDataModel, MAX_LENGTH_API_KEY, MAX_LENGTH_NAME, MAX_LENGTH_PATH
    from datasources.connectors.base import AuthMethod, BaseDataConnector, REQUEST_AUTH_FUNCTIONS
    
    
    @enum.unique
    class UserPermissionLevels(enum.IntEnum):
        """
        User permission levels on data sources.
        """
        #: No permissions
        NONE = 0
    
        #: Permission to view in PEDASI UI
        VIEW = 1
    
        #: Permission to query metadata via API / UI
        META = 2
    
        #: Permission to query data via API / UI
        DATA = 3
    
        #: Permission to query PROV via API / UI
        PROV = 4
    
        @classmethod
        def choices(cls):
            return tuple((i.name, i.value) for i in cls)
    
    
    class UserPermissionLink(models.Model):
        """
        Model to act as a many to many joining table to handle user permission levels for access to data sources.
        """
        #: User being managed
        user = models.ForeignKey(settings.AUTH_USER_MODEL,
                                 on_delete=models.CASCADE)
    
        #: Data source on which the permissions are being granted
        datasource = models.ForeignKey('DataSource',
                                       on_delete=models.CASCADE)
    
        #: Granted permission level
        granted = models.IntegerField(choices=UserPermissionLevels.choices(),
                                      default=UserPermissionLevels.NONE,
                                      blank=False, null=False)
    
        #: Requested permission level
        requested = models.IntegerField(choices=UserPermissionLevels.choices(),
                                        default=UserPermissionLevels.NONE,
                                        blank=False, null=False)
    
    
    class DataSource(BaseAppDataModel):
        """
        Manage access to a data source API.
    
        Will provide functionality to:
    
        * Query data (with query params)
        * Query metadata (with query params)
        * Track provenance of the data source itself
        * Track provenance of data accesses
        """
        #: User who has responsibility for this data source
        owner = models.ForeignKey(settings.AUTH_USER_MODEL,
                                  limit_choices_to={
                                      'groups__name': 'Data providers'
                                  },
                                  on_delete=models.PROTECT,
                                  related_name='datasources',
                                  blank=False, null=False)
    
        #: Information required to initialise the connector for this data source
        _connector_string = models.CharField(max_length=MAX_LENGTH_PATH,
                                             blank=True, null=False)
    
        #: Name of plugin which allows interaction with this data source
        plugin_name = models.CharField(max_length=MAX_LENGTH_NAME,
                                       blank=False, null=False)
    
        #: If the data source API requires an API key use this one
        api_key = models.CharField(max_length=MAX_LENGTH_API_KEY,
                                   blank=True, null=False)
    
        #: Which authentication method to use - defined in :class:`datasources.connectors.base.AuthMethod` enum
        auth_method = models.IntegerField(choices=AuthMethod.choices(),
                                          default=AuthMethod.UNKNOWN.value,
                                          editable=False, blank=False, null=False)
    
        #: Users - linked via a permission table - see :class:`UserPermissionLink`
        users = models.ManyToManyField(settings.AUTH_USER_MODEL,
                                       through=UserPermissionLink)
    
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self._data_connector = None
    
        def save(self, **kwargs):
            # Find out which auth method to use
            if not self.auth_method:
                self.auth_method = self._determine_auth().value
    
            return super().save(**kwargs)
    
        def has_view_permission(self, user: settings.AUTH_USER_MODEL) -> bool:
            """
            Does a user have permission to view this data source in the PEDASI UI?
    
            :param user: User to check
            :return: User has permission?
            """
            if not self.access_control:
                return True
    
            if self.owner == user:
                return True
    
            try:
                permission = UserPermissionLink.objects.get(
                    user=user,
                    datasource=self
                )
            except UserPermissionLink.DoesNotExist:
                return False
    
            return permission.granted >= UserPermissionLevels.VIEW
    
        @property
        def is_catalogue(self) -> bool:
            return self.data_connector_class.is_catalogue
    
        @property
        def connector_string(self):
            if self._connector_string:
                return self._connector_string
            return self.url
    
        @property
        def data_connector_class(self) -> typing.Type[BaseDataConnector]:
            """
            Get the data connector class for this source.
    
            :return: Data connector class
            """
            BaseDataConnector.load_plugins('datasources/connectors')
    
            try:
                plugin = BaseDataConnector.get_plugin(self.plugin_name)
    
            except KeyError as e:
                if not self.plugin_name:
                    raise ValueError('Data source plugin is not set') from e
    
                raise KeyError('Data source plugin not found') from e
    
            return plugin
    
        @property
        def data_connector(self) -> BaseDataConnector:
            """
            Construct the data connector for this source.
    
            :return: Data connector instance
            """
            if self._data_connector is None:
                plugin = self.data_connector_class
    
                # Is the authentication method set?
                auth_method = AuthMethod(self.auth_method)
                if not auth_method:
                    auth_method = self._determine_auth()
    
                # Inject function to get authenticated request
                auth_class = REQUEST_AUTH_FUNCTIONS[auth_method]
    
                if self.api_key:
                    self._data_connector = plugin(self.connector_string, self.api_key,
                                                  auth=auth_class)
                else:
                    self._data_connector = plugin(self.connector_string,
                                                  auth=auth_class)
    
            return self._data_connector
    
        @property
        def search_representation(self) -> str:
            lines = [
                self.name,
                self.owner.get_full_name(),
                self.description,
            ]
    
            try:
                lines.append(json.dumps(
                    self.data_connector.get_metadata(),
                    indent=4
                ))
            except (KeyError, NotImplementedError, ValueError):
                # KeyError: Plugin was not found
                # NotImplementedError: Plugin does not support metadata
                # ValueError: Plugin was not set
                pass
    
            result = '\n'.join(lines)
            return result
    
        def _determine_auth(self) -> AuthMethod:
            # If not using an API key - can't require auth
            if not self.api_key:
                return AuthMethod.NONE
    
            for auth_method_id, auth_function in REQUEST_AUTH_FUNCTIONS.items():
                try:
                    # Can we get a response using this auth method?
                    response = requests.get(self.url,
                                            auth=auth_function(self.api_key, ''))
    
                    response.raise_for_status()
                    return auth_method_id
    
                except (TypeError, requests.exceptions.HTTPError):
                    pass
    
        def get_absolute_url(self):
            return reverse('datasources:datasource.detail',
                           kwargs={'pk': self.pk})