Skip to content
Snippets Groups Projects
Commit dac77740 authored by James Graham's avatar James Graham
Browse files

Move determine_auth_method into data connector base class

parent 0445b33e
Branches
Tags
No related merge requests found
......@@ -6,7 +6,7 @@ from django.test import Client, TestCase
from rest_framework.authtoken.models import Token
from rest_framework.test import APIClient
from datasources import models
from datasources import connectors, models
class RootApiTest(TestCase):
......@@ -499,7 +499,7 @@ class DataSourceApiHyperCatTest(TestCase):
url=cls.test_url,
api_key=cls.api_key,
plugin_name=cls.plugin_name,
auth_method=models.DataSource.determine_auth_method(cls.test_url, cls.api_key)
auth_method=connectors.BaseDataConnector.determine_auth_method(cls.test_url, cls.api_key)
)
def setUp(self):
......
......@@ -100,6 +100,40 @@ class BaseDataConnector(metaclass=plugin.Plugin):
def request_count(self):
return self._request_counter.count()
@staticmethod
def determine_auth_method(url: str, api_key: str) -> AuthMethod:
"""
Determine which authentication method to use to access the data source.
Test each known authentication method in turn until one succeeds.
:param url: URL to authenticate against
:param api_key: API key to use for authentication
:return: First successful authentication method
"""
# If not using an API key - can't require auth
if not api_key:
return AuthMethod.NONE
for auth_method_id, auth_function in REQUEST_AUTH_FUNCTIONS.items():
try:
# Can we get a response using this auth method?
if auth_function is None:
response = requests.get(url)
else:
response = requests.get(url,
auth=auth_function(api_key, ''))
response.raise_for_status()
return auth_method_id
except requests.exceptions.HTTPError:
pass
# None of the attempted authentication methods was successful
raise requests.exceptions.ConnectionError('Could not authenticate against external API')
def get_metadata(self,
params: typing.Optional[typing.Mapping[str, str]] = None):
"""
......
......@@ -24,7 +24,8 @@ class DataSourceForm(forms.ModelForm):
cleaned_data = super().clean()
try:
cleaned_data['auth_method'] = models.DataSource.determine_auth_method(
# TODO construct and actual data connector instance here
cleaned_data['auth_method'] = connectors.BaseDataConnector.determine_auth_method(
cleaned_data['url'],
cleaned_data['api_key']
)
......
......@@ -404,7 +404,7 @@ class DataSource(BaseAppDataModel):
# Is the authentication method set?
auth_method = AuthMethod(self.auth_method)
if not auth_method:
auth_method = self.determine_auth_method(self.url, self.api_key)
auth_method = plugin.determine_auth_method(self.url, self.api_key)
# Inject function to get authenticated request
auth_class = REQUEST_AUTH_FUNCTIONS[auth_method]
......@@ -475,40 +475,6 @@ class DataSource(BaseAppDataModel):
result = '\n'.join(lines)
return result
@staticmethod
def determine_auth_method(url: str, api_key: str) -> AuthMethod:
"""
Determine which authentication method to use to access the data source.
Test each known authentication method in turn until one succeeds.
:param url: URL to authenticate against
:param api_key: API key to use for authentication
:return: First successful authentication method
"""
# If not using an API key - can't require auth
if not api_key:
return AuthMethod.NONE
for auth_method_id, auth_function in REQUEST_AUTH_FUNCTIONS.items():
try:
# Can we get a response using this auth method?
if auth_function is None:
response = requests.get(url)
else:
response = requests.get(url,
auth=auth_function(api_key, ''))
response.raise_for_status()
return auth_method_id
except requests.exceptions.HTTPError:
pass
# None of the attempted authentication methods was successful
raise requests.exceptions.ConnectionError('Could not authenticate against external API')
def get_absolute_url(self):
return reverse('datasources:datasource.detail',
kwargs={'pk': self.pk})
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment