Skip to content
Snippets Groups Projects
Commit bfcd9884 authored by James Graham's avatar James Graham
Browse files

Add unmanaged MongoDB connector and tests - #19

parent b6d78946
No related branches found
No related tags found
No related merge requests found
import json
import typing
import pymongo
from .base import DataSetConnector, DummyRequestsResponse
class UnmanagedMongoConnector(DataSetConnector):
"""
This connector handles requests for data from a MongoDB collection managed outside of PEDASI.
"""
def __init__(self, location: str,
api_key: typing.Optional[str] = None,
auth: typing.Optional[typing.Callable] = None,
**kwargs):
super().__init__(location, api_key, auth, **kwargs)
# TODO validate db url
host, database, collection = location.rsplit('/', 2)
self._client = pymongo.MongoClient(host)
self._database = self._client[database]
self._collection = self._database[collection]
def get_response(self,
params: typing.Optional[typing.Mapping[str, str]] = None):
"""
Select and return all contents of table.
"""
result = self._collection.find(filter=params)
return DummyRequestsResponse(
json.dumps([dict(row) for row in result], default=str),
200, content_type='application/json'
)
...@@ -143,3 +143,61 @@ class ConnectorUnmanagedSqlTest(TestCase): ...@@ -143,3 +143,61 @@ class ConnectorUnmanagedSqlTest(TestCase):
for record in result: for record in result:
self.assertEqual('datasources', record['app']) self.assertEqual('datasources', record['app'])
class UnmanagedMongoConnectorTest(TestCase):
# Use Django migrations table as test data
location = 'mongodb://localhost/prov/django_migrations'
def _get_connection(self) -> BaseDataConnector:
return self.plugin(self.location)
def setUp(self):
BaseDataConnector.load_plugins('datasources/connectors')
self.plugin = BaseDataConnector.get_plugin('UnmanagedMongoConnector')
def test_get_plugin(self):
self.assertIsNotNone(self.plugin)
def test_plugin_init(self):
connection = self._get_connection()
self.assertEqual(connection.location, self.location)
def test_plugin_type(self):
connection = self._get_connection()
self.assertFalse(connection.is_catalogue)
def test_plugin_get_data(self):
connection = self._get_connection()
result = connection.get_data()
self.assertEqual(list, type(result))
self.assertLessEqual(1, len(result))
first = result[0]
self.assertEqual(dict, type(first))
self.assertIn('id', first)
self.assertIn('app', first)
self.assertIn('name', first)
self.assertIn('applied', first)
def test_plugin_get_data_query(self):
connection = self._get_connection()
result = connection.get_data(params={'app': 'datasources'})
self.assertEqual(list, type(result))
self.assertLessEqual(1, len(result))
first = result[0]
self.assertEqual(dict, type(first))
self.assertIn('id', first)
self.assertIn('app', first)
self.assertIn('name', first)
self.assertIn('applied', first)
for record in result:
self.assertEqual('datasources', record['app'])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment