From: Felix Dörre Date: Sun, 10 Jan 2021 22:16:14 +0000 (+0100) Subject: Merge branch 'admin_cleanup' into 'master' X-Git-Url: https://code.wpia.club/?p=motion.git;a=commitdiff_plain;h=7f7b44cb967ae9b99ddcbf3879c92d448570d49c;hp=e48d9939220828479799f89e24d52ca684e0001c Merge branch 'admin_cleanup' into 'master' add: implement motion cleanup See merge request felixdoerre/motion!28 --- diff --git a/README.md b/README.md index 7644452..3bbaeb5 100644 --- a/README.md +++ b/README.md @@ -200,4 +200,18 @@ flask create-user "email address" "host" ``` -The application will return a message for the success. +The application will return a message for success. + +To mask motions use this command +``` +flask motion_masking motionidentifier motionurl host" + +``` + +where: + +* motionidentifier - the motion identifier or left part of it which should be cleaned +* motionurl - an url to a motion that is the reason for the cleanup +* host - host where the motions are located + +The application will return a message for success. diff --git a/motion.py b/motion.py index 21e3e16..c4337f9 100644 --- a/motion.py +++ b/motion.py @@ -12,6 +12,7 @@ from datetime import date, time, datetime from flask_language import Language, current_language import gettext import click +import re def get_db(): db = getattr(g, '_database', None) @@ -261,6 +262,11 @@ def init_db(): db.prepare("ALTER TABLE \"voter\" ALTER COLUMN \"host\" SET NOT NULL")() db.prepare("UPDATE \"schema_version\" SET \"version\"=6")() + if ver < 7: + with app.open_resource('sql/from_6.sql', mode='r') as f: + db.execute(f.read()) + db.prepare("UPDATE \"schema_version\" SET \"version\"=7")() + init_db() def is_in_ratelimit(group): @@ -305,6 +311,12 @@ def rel_redirect(loc): r.autocorrect_location_header = False return r +def write_proxy_log(userid, action, comment): + get_db().prepare("INSERT INTO adminlog(user_id, action, comment, action_user_id) VALUES($1, $2, $3, $4)")(userid, action, comment, g.voter) + +def write_masking_log(comment): + get_db().prepare("INSERT INTO adminlog(user_id, action, comment, action_user_id) VALUES($1, 'motionmasking', $2, $1)")(0, comment) + @app.route("/motion", methods=['POST']) def put_motion(): cat=request.form.get("category", "") @@ -466,6 +478,7 @@ def add_proxy(): if rv[0].get("c") is None or rv[0].get("c") >= max_proxy: return _("Error, Max proxy for '%s' reached.") % (proxy), 400 rv = get_db().prepare("INSERT INTO proxy(voter_id, proxy_id, granted_by) VALUES ($1,$2,$3)")(voterid, proxyid, g.voter) + write_proxy_log(voterid, 'proxygranted', 'proxy: '+str(proxyid)) return rel_redirect("/proxy") @app.route("/proxy/revoke", methods=['POST']) @@ -474,6 +487,7 @@ def revoke_proxy(): return _('Forbidden'), 403 id=request.form.get("id", "") rv = get_db().prepare("UPDATE proxy SET revoked=CURRENT_TIMESTAMP, revoked_by=$1 WHERE id=$2")(g.voter, int(id)) + write_proxy_log(int(id), 'proxyrevoked', '') return rel_redirect("/proxy") @app.route("/proxy/revokeall", methods=['POST']) @@ -481,6 +495,7 @@ def revoke_proxy_all(): if not may_admin("proxyadmin"): return _('Forbidden'), 403 rv = get_db().prepare("UPDATE proxy SET revoked=CURRENT_TIMESTAMP, revoked_by=$1 WHERE revoked IS NULL")(g.voter) + write_proxy_log(g.voter, 'proxyrevokedall', '') return rel_redirect("/proxy") @app.route("/language/") @@ -500,3 +515,25 @@ def create_user(email, host): db.prepare("INSERT INTO voter(\"email\", \"host\") VALUES($1, $2)")(email, host) messagetext=_("User '%s' inserted to %s.") % (email, host) click.echo(messagetext) + +@app.cli.command("motion-masking") +@click.argument("motion") +@click.argument("motionreason") +@click.argument("host") +def motion_masking(motion, motionreason, host): + if re.search(r"[%_\\]", motion): + messagetext = _("No wildcards allowed for motion entry '%s'.") % (motion) + click.echo(messagetext) + else: + db = get_db() + with db.xact(): + rv = db.prepare("SELECT id FROM motion WHERE identifier LIKE $1 AND host = $2")(motion+"%", host) + count = len(rv) + messagetext = _("%s record(s) affected by masking of '%s'.") % (count, motion) + click.echo(messagetext) + if len(rv) != 0: + rv = db.prepare("SELECT id FROM motion WHERE content LIKE $1 AND host = $2")('%'+motionreason+"%", host) + rv = db.prepare("UPDATE motion SET name=$3, content=$4 WHERE identifier LIKE $1 AND host = $2 RETURNING id ")(motion+"%", host, _("Motion masked"), _("Motion masked on base of motion [%s](%s) on %s") % (motionreason, motionreason, datetime.now().strftime("%Y-%m-%d"))) + messagetext = _("%s record(s) updated by masking of '%s'.") % (len(rv), motion) + write_masking_log(_("%s motion(s) masked on base of motion %s with motion identifier '%s' on host %s") %(len(rv), motionreason, motion, host)) + click.echo(messagetext) diff --git a/sql/from_6.sql b/sql/from_6.sql new file mode 100644 index 0000000..095d54f --- /dev/null +++ b/sql/from_6.sql @@ -0,0 +1,10 @@ +DROP TABLE IF EXISTS adminlog; +DROP TYPE IF EXISTS "admin_log"; +CREATE TYPE "admin_log" AS ENUM ('motionmasking', 'proxygranted', 'proxyrevoked', 'proxyrevokedall'); +CREATE TABLE adminlog (id serial NOT NULL, + user_id INTEGER NOT NULL, + action admin_log NOT NULL, + comment text NULL, + action_user_id INTEGER NOT NULL, + actiontime timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(id)); diff --git a/sql/schema.sql b/sql/schema.sql index 690425b..c3967a4 100644 --- a/sql/schema.sql +++ b/sql/schema.sql @@ -43,6 +43,17 @@ CREATE TABLE proxy (id serial NOT NULL, CREATE INDEX proxy_voter ON proxy (voter_id); CREATE INDEX proxy_proxy ON proxy (proxy_id); +DROP TABLE IF EXISTS adminlog; +DROP TYPE IF EXISTS "admin_log"; +CREATE TYPE "admin_log" AS ENUM ('motionmasking', 'proxygranted', 'proxyrevoked', 'proxyrevokedall'); +CREATE TABLE adminlog (id serial NOT NULL, + user_id INTEGER NOT NULL, + action admin_log NOT NULL, + comment text NULL, + action_user_id INTEGER NOT NULL, + actiontime timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(id)); + DROP TABLE IF EXISTS schema_version; CREATE TABLE schema_version (version INTEGER NOT NULL); -INSERT INTO schema_version(version) VALUES(6); +INSERT INTO schema_version(version) VALUES(7); diff --git a/tests/test_basics.py b/tests/test_basics.py index b6f111c..0f27f48 100644 --- a/tests/test_basics.py +++ b/tests/test_basics.py @@ -73,10 +73,36 @@ class BasicTest(TestCase): + '\nNo '+str(no)+'
'\ + '\nAbstain '+str(abstain)+'' - + # functions handling or using database def open_DB(self): return postgresql.open(app.config.get("DATABASE"), user=app.config.get("USER"), password=app.config.get("PASSWORD")) + def db_select(self, sql, parameter): + with self.open_DB() as db: + rv = db.prepare(sql)(parameter) + return rv + + def db_select2(self, sql, parameter, parameter2): + with self.open_DB() as db: + rv = db.prepare(sql)(parameter, parameter2) + return rv + + def recordCountLog(self, parameter): + return self.recordCount("SELECT * FROM adminlog WHERE action=$1", parameter) + + def recordCount(self, sql, parameter): + rv = self.db_select(sql, parameter) + return len(rv) + + def logRecordDetailsTest(self, parameter, recordno, voterid, comment, actionuserid): + rv = self.db_select("SELECT * FROM adminlog WHERE action=$1 ORDER BY id", parameter) + self.assertEqual(voterid, rv[recordno].get("user_id")) + if comment: + self.assertEqual(comment, rv[recordno].get("comment")) + else: + self.assertEqual('', rv[recordno].get("comment")) + self.assertEqual(actionuserid, rv[recordno].get("action_user_id")) + # functions to clear database def db_clear(self): with self.open_DB() as db: diff --git a/tests/test_motion.py b/tests/test_motion.py index 7ef5cea..b1ec433 100644 --- a/tests/test_motion.py +++ b/tests/test_motion.py @@ -515,360 +515,6 @@ class AuditMotionTests(BasicTest): + '\n
User C: no
\n \n\nBack' self.assertIn(str.encode(testtext), result.data) -class ProxyManagementTests(BasicTest): - - def setUp(self): - self.init_test() - global user - user='testuser/proxyadmin:*' - global userid - userid=4 - self.db_sampledata() - - def tearDown(self): - pass - - def test_see_proxy(self): - result = self.app.get('proxy', environ_base={'USER_ROLES': user}, follow_redirects=True) - testtext= 'div class="container">\n
' - self.assertIn(str.encode(testtext), result.data) - testtext= 'proxy granted to:' - self.assertNotIn(str.encode(testtext), result.data) - testtext= 'holds proxy of:' - self.assertNotIn(str.encode(testtext), result.data) - testtext= '\n' - self.assertIn(str.encode(testtext), result.data) - testtext= '\n' - self.assertIn(str.encode(testtext), result.data) - testtext= '\n '\ - + '\n '\ - + '\n \n \n \n '\ - + '
VoterProxy
\n' - self.assertIn(str.encode(testtext), result.data) - testtext= 'Proxy management' - self.assertIn(str.encode(testtext), result.data) - - def test_add_proxy(self): - voter='' - proxy='' - response = self.addProxy(user, voter, proxy) - self.assertEqual(response.status_code, 400) - self.assertIn(str.encode('Error, voter equals proxy.'), response.data) - - voter='User A' - response = self.addProxy(user, voter, proxy) - self.assertEqual(response.status_code, 400) - self.assertIn(str.encode('Error, proxy not found.'), response.data) - - voter='User Z' - response = self.addProxy(user, voter, proxy) - self.assertEqual(response.status_code, 400) - self.assertIn(str.encode('Error, voter not found.'), response.data) - - voter='' - proxy='User B' - response = self.addProxy(user, voter, proxy) - self.assertEqual(response.status_code, 400) - self.assertIn(str.encode('Error, voter not found.'), response.data) - - voter='User B' - proxy='User B' - response = self.addProxy(user, voter, proxy) - self.assertEqual(response.status_code, 400) - self.assertIn(str.encode('Error, voter equals proxy.'), response.data) - - voter='User A' - proxy='User Z' - response = self.addProxy(user, voter, proxy) - self.assertEqual(response.status_code, 400) - self.assertIn(str.encode('Error, proxy not found.'), response.data) - - voter='User A' - proxy='User B' - response = self.addProxy(user, voter, proxy) - self.assertEqual(response.status_code, 302) - result = self.app.get('proxy', environ_base={'USER_ROLES': user}, follow_redirects=True) - testtext= '' - self.assertIn(str.encode(testtext), result.data) - testtext= '\n '\ - + '\n '\ - + '\n '\ - + '\n \n \n '\ - + '\n \n \n '\ - + '\n '\ - + '\n
VoterProxy
User AUser B
\n' - self.assertIn(str.encode(testtext), result.data) - - response = self.addProxy(user, voter, proxy) - self.assertEqual(response.status_code, 400) - self.assertIn(str.encode('Error, proxy allready given.'), response.data) - - voter='User A' - proxy='User C' - response = self.addProxy(user, voter, proxy) - self.assertEqual(response.status_code, 400) - self.assertIn(str.encode('Error, proxy allready given.'), response.data) - - voter='User C' - proxy='User B' - response = self.addProxy(user, voter, proxy) - self.assertEqual(response.status_code, 302) - result = self.app.get('proxy', environ_base={'USER_ROLES': user}, follow_redirects=True) - testtext= '\n '\ - + '\n '\ - + '\n '\ - + '\n \n \n '\ - + '\n \n \n '\ - + '\n \n '\ - + '\n \n \n '\ - + '\n '\ - + '\n
VoterProxy
User AUser B
User CUser B
\n' - self.assertIn(str.encode(testtext), result.data) - testtext= 'proxy granted to:' - self.assertNotIn(str.encode(testtext), result.data) - testtext= 'holds proxy of:' - self.assertNotIn(str.encode(testtext), result.data) - - voter='testuser' - proxy='User B' - response = self.addProxy(user, voter, proxy) - self.assertEqual(response.status_code, 400) - self.assertIn(str.encode('Error, Max proxy for \'User B\' reached.'), response.data) - - voter='testuser' - proxy='User A' - response = self.addProxy(user, voter, proxy) - self.assertEqual(response.status_code, 302) - result = self.app.get('proxy', environ_base={'USER_ROLES': user}, follow_redirects=True) - testtext= '\n '\ - + '\n '\ - + '\n \n \n \n '\ - + '\n \n \n '\ - + '\n \n '\ - + '\n \n \n '\ - + '\n \n '\ - + '\n \n \n '\ - + '\n '\ - + '\n
VoterProxy
testuserUser A
User AUser B
User CUser B
\n' - self.assertIn(str.encode(testtext), result.data) - testtext= 'proxy granted to: User A\n' - self.assertIn(str.encode(testtext), result.data) - testtext= 'holds proxy of:' - self.assertNotIn(str.encode(testtext), result.data) - - voter='User B' - proxy='testuser' - response = self.addProxy(user, voter, proxy) - self.assertEqual(response.status_code, 302) - result = self.app.get('proxy', environ_base={'USER_ROLES': user}, follow_redirects=True) - testtext= '\n '\ - + '\n '\ - + '\n \n \n \n '\ - + '\n \n \n '\ - + '\n \n '\ - + '\n \n \n '\ - + '\n \n '\ - + '\n \n \n '\ - + '\n \n '\ - + '\n \n \n '\ - + '\n '\ - + '\n
VoterProxy
testuserUser A
User AUser B
User Btestuser
User CUser B
\n' - self.assertIn(str.encode(testtext), result.data) - testtext= 'proxy granted to: User A\n' - self.assertIn(str.encode(testtext), result.data) - testtext= 'holds proxy of: User B\n' - self.assertIn(str.encode(testtext), result.data) - - response = self.revokeProxy(user, userid) - self.assertEqual(response.status_code, 302) - result = self.app.get('proxy', environ_base={'USER_ROLES': user}, follow_redirects=True) - testtext= '\n '\ - + '\n '\ - + '\n \n \n \n '\ - + '\n \n \n '\ - + '\n \n '\ - + '\n \n \n '\ - + '\n \n '\ - + '\n \n \n '\ - + '\n '\ - + '\n
VoterProxy
testuserUser A
User AUser B
User CUser B
\n' - self.assertIn(str.encode(testtext), result.data) - testtext= 'proxy granted to: User A\n' - self.assertIn(str.encode(testtext), result.data) - testtext= 'holds proxy of:' - self.assertNotIn(str.encode(testtext), result.data) - - response = self.revokeProxy(user, 3) - self.assertEqual(response.status_code, 302) - result = self.app.get('proxy', environ_base={'USER_ROLES': user}, follow_redirects=True) - testtext= '\n '\ - + '\n '\ - + '\n \n \n \n '\ - + '\n \n \n '\ - + '\n \n '\ - + '\n \n \n '\ - + '\n '\ - + '\n
VoterProxy
User AUser B
User CUser B
\n' - self.assertIn(str.encode(testtext), result.data) - testtext= 'proxy granted to:' - self.assertNotIn(str.encode(testtext), result.data) - testtext= 'holds proxy of:' - self.assertNotIn(str.encode(testtext), result.data) - - result = self.app.post('proxy/revokeall', environ_base={'USER_ROLES': user}, follow_redirects=True) - self.assertEqual(response.status_code, 302) - result = self.app.get('proxy', environ_base={'USER_ROLES': user}, follow_redirects=True) - testtext= '\n '\ - + '\n '\ - + '\n \n \n \n'\ - + '
VoterProxy
\n' - self.assertNotIn(str.encode(testtext), result.data) - - proxytest="proxytest" - with self.open_DB() as db: - db.prepare("INSERT INTO voter(\"email\", \"host\") VALUES($1, $2)")(proxytest, '127.0.0.1:5001') - result = self.app.get('proxy', environ_base={'USER_ROLES': user}, follow_redirects=True) - - response = self.addProxy(user, proxytest, 'testuser') - self.assertEqual(response.status_code, 400) - self.assertIn(str.encode('Error, voter not found.'), response.data) - - response = self.addProxy(user, 'testuser', proxytest) - self.assertEqual(response.status_code, 400) - self.assertIn(str.encode('Error, proxy not found.'), response.data) - - def test_see_proxy_host_only(self): - proxytest="proxytest" - with self.open_DB() as db: - db.prepare("INSERT INTO voter(\"email\", \"host\") VALUES($1, $2)")(proxytest, '127.0.0.1:5001') - result = self.app.get('proxy', environ_base={'USER_ROLES': user}, follow_redirects=True) - testtext= 'div class="container">\n' - self.assertIn(str.encode(testtext), result.data) - testtext= 'proxy granted to:' - self.assertNotIn(str.encode(testtext), result.data) - testtext= 'holds proxy of:' - self.assertNotIn(str.encode(testtext), result.data) - testtext= '\n' - self.assertIn(str.encode(testtext), result.data) - testtext= '\n' - self.assertIn(str.encode(testtext), result.data) - self.assertNotIn(str.encode(proxytest), result.data) - -class ProxyVoteTests(BasicTest): - - def setUp(self): - self.init_test() - global user - user='testuser/vote:* proxyadmin:*' - self.db_sampledata() - - def tearDown(self): - pass - - def test_proxy_vote(self): - voter='testuser' - proxy='User B' - proxyid=2 - proxyuser='User B/vote:*' - - response = self.addProxy(user, proxy, voter) - self.assertEqual(response.status_code, 302) - - motion='g1.20200402.004' - response = self.createVote(user, motion, 'yes', proxyid) - self.assertEqual(response.status_code, 302) - - # testuser view - result = self.app.get('/motion/' + motion, environ_base={'USER_ROLES': user}, follow_redirects=True) - # own vote without change - testtext= '\n'\ - + '\n'\ - + '\n'\ - + '\n
' - self.assertIn(str.encode(testtext), result.data) - # proxy vote with change - testtext= '
\n'\ - + '\n'\ - + '\n'\ - + '\n
\n' - self.assertIn(str.encode(testtext), result.data) - - # User B view - result = self.app.get('/motion/' + motion, environ_base={'USER_ROLES': proxyuser}, follow_redirects=True) - # own vote without change - testtext= '

My vote

\nGiven by testuser\n'\ - + '
\n'\ - + '\n'\ - + '\n'\ - + '\n
' - self.assertIn(str.encode(testtext), result.data) - - # change vote - response = self.createVote(user, motion, 'no', proxyid) - self.assertEqual(response.status_code, 302) - - result = self.app.get('/motion/' + motion, environ_base={'USER_ROLES': user}, follow_redirects=True) - testtext= '
\n'\ - + '\n'\ - + '\n'\ - + '\n
\n' - self.assertIn(str.encode(testtext), result.data) - - def test_proxy_vote_no_proxy(self): - voter='testuser' - proxy='User B' - # wrong proxy id - proxyid=3 - - response = self.addProxy(user, proxy, voter) - self.assertEqual(response.status_code, 302) - - motion='g1.20200402.004' - response = self.createVote(user, motion, 'yes', proxyid) - self.assertEqual(response.status_code, 400) - self.assertIn(str.encode('Error, proxy not found'), response.data) - - # non existing id - proxyid=10000 - - motion='g1.20200402.004' - response = self.createVote(user, motion, 'yes', proxyid) - self.assertEqual(response.status_code, 400) - self.assertIn(str.encode('Error, proxy not found'), response.data) - - def test_proxy_vote_no_voter(self): - voter='User A' - proxy='User B' - proxyid=2 - - response = self.addProxy(user, proxy, voter) - self.assertEqual(response.status_code, 302) - - user1='testuser1/' - motion='g1.20200402.004' - response = self.createVote(user1, motion, 'yes', proxyid) - self.assertEqual(response.status_code, 403) - self.assertIn(str.encode('Forbidden'), response.data) - - if __name__ == "__main__": unittest.main() diff --git a/tests/test_proxy.py b/tests/test_proxy.py new file mode 100644 index 0000000..95bc8f2 --- /dev/null +++ b/tests/test_proxy.py @@ -0,0 +1,349 @@ +from datetime import datetime +from tests.test_basics import BasicTest +import motion +import postgresql +from motion import app + + +class ProxyManagementTests(BasicTest): + + def setUp(self): + self.init_test() + global user + user='testuser/proxyadmin:*' + global userid + userid = 4 + self.db_sampledata() + + def tearDown(self): + pass + + def test_see_proxy(self): + result = self.app.get('proxy', environ_base={'USER_ROLES': user}, follow_redirects=True) + testtext= 'div class="container">\n
' + self.assertIn(str.encode(testtext), result.data) + testtext= 'proxy granted to:' + self.assertNotIn(str.encode(testtext), result.data) + testtext= 'holds proxy of:' + self.assertNotIn(str.encode(testtext), result.data) + testtext= '\n' + self.assertIn(str.encode(testtext), result.data) + testtext= '\n' + self.assertIn(str.encode(testtext), result.data) + testtext= '\n '\ + + '\n '\ + + '\n \n \n \n '\ + + '
VoterProxy
\n' + self.assertIn(str.encode(testtext), result.data) + testtext= 'Proxy management' + self.assertIn(str.encode(testtext), result.data) + + def test_add_proxy(self): + voter='' + proxy='' + records=0 + response = self.addProxy(user, voter, proxy) + self.assertEqual(response.status_code, 400) + self.assertIn(str.encode('Error, voter equals proxy.'), response.data) + self.assertEqual(records, self.recordCountLog('proxygranted')) + + voter='User A' + response = self.addProxy(user, voter, proxy) + self.assertEqual(response.status_code, 400) + self.assertIn(str.encode('Error, proxy not found.'), response.data) + + voter='User Z' + response = self.addProxy(user, voter, proxy) + self.assertEqual(response.status_code, 400) + self.assertIn(str.encode('Error, voter not found.'), response.data) + + voter='' + proxy='User B' + response = self.addProxy(user, voter, proxy) + self.assertEqual(response.status_code, 400) + self.assertIn(str.encode('Error, voter not found.'), response.data) + + voter='User B' + proxy='User B' + response = self.addProxy(user, voter, proxy) + self.assertEqual(response.status_code, 400) + self.assertIn(str.encode('Error, voter equals proxy.'), response.data) + self.assertEqual(records, self.recordCountLog('proxygranted')) + + voter='User A' + voterid=1 + proxy='User B' + records=1 + response = self.addProxy(user, voter, proxy) + self.assertEqual(response.status_code, 302) + result = self.app.get('proxy', environ_base={'USER_ROLES': user}, follow_redirects=True) + testtext= '' + self.assertIn(str.encode(testtext), result.data) + testtext= '\n '\ + + '\n '\ + + '\n '\ + + '\n \n \n '\ + + '\n \n \n '\ + + '\n '\ + + '\n
VoterProxy
User AUser B
\n' + self.assertIn(str.encode(testtext), result.data) + self.assertEqual(records, self.recordCountLog('proxygranted')) + self.logRecordDetailsTest('proxygranted', records-1, voterid, 'proxy: 2', userid) + + response = self.addProxy(user, voter, proxy) + self.assertEqual(response.status_code, 400) + self.assertIn(str.encode('Error, proxy allready given.'), response.data) + + voter='User A' + proxy='User C' + response = self.addProxy(user, voter, proxy) + self.assertEqual(response.status_code, 400) + self.assertIn(str.encode('Error, proxy allready given.'), response.data) + self.assertEqual(records, self.recordCountLog('proxygranted')) + + voter='User C' + voterid=3 + proxy='User B' + records=2 + response = self.addProxy(user, voter, proxy) + self.assertEqual(response.status_code, 302) + result = self.app.get('proxy', environ_base={'USER_ROLES': user}, follow_redirects=True) + testtext= '\n '\ + + '\n '\ + + '\n '\ + + '\n \n \n '\ + + '\n \n \n '\ + + '\n \n '\ + + '\n \n \n '\ + + '\n '\ + + '\n
VoterProxy
User AUser B
User CUser B
\n' + self.assertIn(str.encode(testtext), result.data) + testtext= 'proxy granted to:' + self.assertNotIn(str.encode(testtext), result.data) + testtext= 'holds proxy of:' + self.assertNotIn(str.encode(testtext), result.data) + self.assertEqual(records, self.recordCountLog('proxygranted')) + self.logRecordDetailsTest('proxygranted', records-1, voterid, 'proxy: 2', userid) + + voter='testuser' + proxy='User B' + response = self.addProxy(user, voter, proxy) + self.assertEqual(response.status_code, 400) + self.assertIn(str.encode('Error, Max proxy for \'User B\' reached.'), response.data) + self.assertEqual(records, self.recordCountLog('proxygranted')) + + voter='testuser' + voterid=4 + proxy='User A' + records=3 + response = self.addProxy(user, voter, proxy) + self.assertEqual(response.status_code, 302) + result = self.app.get('proxy', environ_base={'USER_ROLES': user}, follow_redirects=True) + testtext= '\n '\ + + '\n '\ + + '\n \n \n \n '\ + + '\n \n \n '\ + + '\n \n '\ + + '\n \n \n '\ + + '\n \n '\ + + '\n \n \n '\ + + '\n '\ + + '\n
VoterProxy
testuserUser A
User AUser B
User CUser B
\n' + self.assertIn(str.encode(testtext), result.data) + testtext= 'proxy granted to: User A\n' + self.assertIn(str.encode(testtext), result.data) + testtext= 'holds proxy of:' + self.assertNotIn(str.encode(testtext), result.data) + self.assertEqual(records, self.recordCountLog('proxygranted')) + self.logRecordDetailsTest('proxygranted', records-1, voterid, 'proxy: 1', userid) + + voter='User B' + voterid=2 + proxy='testuser' + records=4 + response = self.addProxy(user, voter, proxy) + self.assertEqual(response.status_code, 302) + result = self.app.get('proxy', environ_base={'USER_ROLES': user}, follow_redirects=True) + testtext= '\n '\ + + '\n '\ + + '\n \n \n \n '\ + + '\n \n \n '\ + + '\n \n '\ + + '\n \n \n '\ + + '\n \n '\ + + '\n \n \n '\ + + '\n \n '\ + + '\n \n \n '\ + + '\n '\ + + '\n
VoterProxy
testuserUser A
User AUser B
User Btestuser
User CUser B
\n' + self.assertIn(str.encode(testtext), result.data) + testtext= 'proxy granted to: User A\n' + self.assertIn(str.encode(testtext), result.data) + testtext= 'holds proxy of: User B\n' + self.assertIn(str.encode(testtext), result.data) + self.assertEqual(records, self.recordCountLog('proxygranted')) + self.logRecordDetailsTest('proxygranted', records-1, voterid, 'proxy: 4', userid) + + recordsRevoked=0 + self.assertEqual(recordsRevoked, self.recordCountLog('proxyrevoked')) + recordsRevoked=1 + response = self.revokeProxy(user, userid) + self.assertEqual(response.status_code, 302) + result = self.app.get('proxy', environ_base={'USER_ROLES': user}, follow_redirects=True) + testtext= '\n '\ + + '\n '\ + + '\n \n \n \n '\ + + '\n \n \n '\ + + '\n \n '\ + + '\n \n \n '\ + + '\n \n '\ + + '\n \n \n '\ + + '\n '\ + + '\n
VoterProxy
testuserUser A
User AUser B
User CUser B
\n' + self.assertIn(str.encode(testtext), result.data) + testtext= 'proxy granted to: User A\n' + self.assertIn(str.encode(testtext), result.data) + testtext= 'holds proxy of:' + self.assertNotIn(str.encode(testtext), result.data) + self.assertEqual(recordsRevoked, self.recordCountLog('proxyrevoked')) + self.logRecordDetailsTest('proxyrevoked', recordsRevoked-1, userid, '', userid) + + recordsRevoked=2 + proxyid=3 + response = self.revokeProxy(user, proxyid) + self.assertEqual(response.status_code, 302) + result = self.app.get('proxy', environ_base={'USER_ROLES': user}, follow_redirects=True) + testtext= '\n '\ + + '\n '\ + + '\n \n \n \n '\ + + '\n \n \n '\ + + '\n \n '\ + + '\n \n \n '\ + + '\n '\ + + '\n
VoterProxy
User AUser B
User CUser B
\n' + self.assertIn(str.encode(testtext), result.data) + testtext= 'proxy granted to:' + self.assertNotIn(str.encode(testtext), result.data) + testtext= 'holds proxy of:' + self.assertNotIn(str.encode(testtext), result.data) + self.assertEqual(recordsRevoked, self.recordCountLog('proxyrevoked')) + self.logRecordDetailsTest('proxyrevoked', recordsRevoked-1, proxyid, '', userid) + + recordsRevokedAll=0 + self.assertEqual(recordsRevokedAll, self.recordCountLog('proxyrevokedall')) + recordsRevokedAll=1 + result = self.app.post('proxy/revokeall', environ_base={'USER_ROLES': user}, follow_redirects=True) + self.assertEqual(response.status_code, 302) + result = self.app.get('proxy', environ_base={'USER_ROLES': user}, follow_redirects=True) + testtext= '\n '\ + + '\n '\ + + '\n \n \n \n '\ + + '
VoterProxy
\n' + self.assertIn(str.encode(testtext), result.data) + self.assertEqual(recordsRevokedAll, self.recordCountLog('proxyrevokedall')) + self.logRecordDetailsTest('proxyrevokedall', recordsRevokedAll-1, userid, '', userid) + +class ProxyVoteTests(BasicTest): + + def setUp(self): + self.init_test() + global user + user='testuser/vote:* proxyadmin:*' + self.db_sampledata() + + def tearDown(self): + pass + + def test_proxy_vote(self): + voter='testuser' + proxy='User B' + proxyid=2 + proxyuser='User B/vote:*' + + response = self.addProxy(user, proxy, voter) + self.assertEqual(response.status_code, 302) + + motion='g1.20200402.004' + response = self.createVote(user, motion, 'yes', proxyid) + self.assertEqual(response.status_code, 302) + + # testuser view + result = self.app.get('/motion/' + motion, environ_base={'USER_ROLES': user}, follow_redirects=True) + # own vote without change + testtext= '\n'\ + + '\n'\ + + '\n'\ + + '\n
' + self.assertIn(str.encode(testtext), result.data) + # proxy vote with change + testtext= '
\n'\ + + '\n'\ + + '\n'\ + + '\n
\n' + self.assertIn(str.encode(testtext), result.data) + + # User B view + result = self.app.get('/motion/' + motion, environ_base={'USER_ROLES': proxyuser}, follow_redirects=True) + # own vote without change + testtext= '

My vote

\nGiven by testuser\n'\ + + '
\n'\ + + '\n'\ + + '\n'\ + + '\n
' + self.assertIn(str.encode(testtext), result.data) + + # change vote + response = self.createVote(user, motion, 'no', proxyid) + self.assertEqual(response.status_code, 302) + + result = self.app.get('/motion/' + motion, environ_base={'USER_ROLES': user}, follow_redirects=True) + testtext= '
\n'\ + + '\n'\ + + '\n'\ + + '\n
\n' + self.assertIn(str.encode(testtext), result.data) + + def test_proxy_vote_no_proxy(self): + voter='testuser' + proxy='User B' + # wrong proxy id + proxyid=3 + + response = self.addProxy(user, proxy, voter) + self.assertEqual(response.status_code, 302) + + motion='g1.20200402.004' + response = self.createVote(user, motion, 'yes', proxyid) + self.assertEqual(response.status_code, 400) + self.assertIn(str.encode('Error, proxy not found'), response.data) + + # non existing id + proxyid=10000 + + motion='g1.20200402.004' + response = self.createVote(user, motion, 'yes', proxyid) + self.assertEqual(response.status_code, 400) + self.assertIn(str.encode('Error, proxy not found'), response.data) + + def test_proxy_vote_no_voter(self): + voter='User A' + proxy='User B' + proxyid=2 + + response = self.addProxy(user, proxy, voter) + self.assertEqual(response.status_code, 302) + + user1='testuser1/' + motion='g1.20200402.004' + response = self.createVote(user1, motion, 'yes', proxyid) + self.assertEqual(response.status_code, 403) + self.assertIn(str.encode('Forbidden'), response.data) \ No newline at end of file diff --git a/tests/test_user_api.py b/tests/test_user_api.py index f1819f8..521c1f2 100644 --- a/tests/test_user_api.py +++ b/tests/test_user_api.py @@ -5,14 +5,10 @@ import postgresql from click.testing import CliRunner from motion import create_user +from motion import motion_masking from motion import app -def db_select2(self, sql, parameter, parameter2): - with self.open_DB() as db: - rv = db.prepare(sql)(parameter, parameter2) - return rv - class GeneralTests(BasicTest): def setUp(self): @@ -20,8 +16,6 @@ class GeneralTests(BasicTest): def tearDown(self): pass - - def test_create_user(self): user = 'John Doe' @@ -31,7 +25,7 @@ class GeneralTests(BasicTest): assert result.exit_code == 0 self.assertIn("User 'John Doe' inserted to %s." % host, result.output) - rv = db_select2(self,"SELECT email FROM voter WHERE lower(email)=lower($1) AND host=$2", user, host) + rv = self.db_select2("SELECT email FROM voter WHERE lower(email)=lower($1) AND host=$2", user, host) self.assertIn(user, rv[0].get("email")) result = runner.invoke(create_user, (user, host)) @@ -45,9 +39,127 @@ class GeneralTests(BasicTest): assert result.exit_code == 0 self.assertIn("User 'John Doe' inserted to 127.0.0.1:5001.", result.output) - rv = db_select2(self,"SELECT email FROM voter WHERE lower(email)=lower($1) AND host=$2", user, host) + rv = self.db_select2("SELECT email FROM voter WHERE lower(email)=lower($1) AND host=$2", user, host) self.assertIn(user, rv[0].get("email")) result = runner.invoke(create_user, (user, host)) assert result.exit_code == 0 self.assertIn("User 'John Doe' already exists on 127.0.0.1:5001.", result.output) + + def test_motion_masking(self): + self.db_sampledata() + records=0 + self.assertEqual(records, self.recordCountLog('motionmasking')) + + # test motion not exists + motion = 'g1.20200402.999' + motionreason='http://motiontest.wpia.club/motion/xxx' + host= app.config.get("DEFAULT_HOST") + runner = app.test_cli_runner() + result = runner.invoke(motion_masking, (motion, motionreason, host)) + assert result.exit_code == 0 + self.assertIn("0 record(s) affected by masking of 'g1.20200402.999'.", result.output) + self.assertEqual(records, self.recordCountLog('motionmasking')) + + # test motion with wilcards + motion = 'g1.20200402.00%' + motionreason='http://motiontest.wpia.club/motion/xxx' + host= app.config.get("DEFAULT_HOST") + runner = app.test_cli_runner() + result = runner.invoke(motion_masking, (motion, motionreason, host)) + assert result.exit_code == 0 + self.assertIn("No wildcards allowed for motion entry 'g1.20200402.00%'.", result.output) + self.assertEqual(records, self.recordCountLog('motionmasking')) + + motion = 'g1.2020040%.001' + motionreason='http://motiontest.wpia.club/motion/xxx' + host= app.config.get("DEFAULT_HOST") + runner = app.test_cli_runner() + result = runner.invoke(motion_masking, (motion, motionreason, host)) + assert result.exit_code == 0 + self.assertIn("No wildcards allowed for motion entry 'g1.2020040%.001'.", result.output) + self.assertEqual(records, self.recordCountLog('motionmasking')) + + motion = 'g1.20200402.00_' + motionreason='http://motiontest.wpia.club/motion/xxx' + host= app.config.get("DEFAULT_HOST") + runner = app.test_cli_runner() + result = runner.invoke(motion_masking, (motion, motionreason, host)) + assert result.exit_code == 0 + self.assertIn("No wildcards allowed for motion entry 'g1.20200402.00_'.", result.output) + self.assertEqual(records, self.recordCountLog('motionmasking')) + + motion = 'g1.2020040_.001' + motionreason='http://motiontest.wpia.club/motion/xxx' + host= app.config.get("DEFAULT_HOST") + runner = app.test_cli_runner() + result = runner.invoke(motion_masking, (motion, motionreason, host)) + assert result.exit_code == 0 + self.assertIn("No wildcards allowed for motion entry 'g1.2020040_.001'.", result.output) + self.assertEqual(records, self.recordCountLog('motionmasking')) + + motion = 'g1.2020040_.%' + motionreason='http://motiontest.wpia.club/motion/xxx' + host= app.config.get("DEFAULT_HOST") + runner = app.test_cli_runner() + result = runner.invoke(motion_masking, (motion, motionreason, host)) + assert result.exit_code == 0 + self.assertIn("No wildcards allowed for motion entry 'g1.2020040_.%'.", result.output) + self.assertEqual(records, self.recordCountLog('motionmasking')) + + motion = 'g1.20200402.0\\1' + motionreason='http://motiontest.wpia.club/motion/xxx' + host= app.config.get("DEFAULT_HOST") + runner = app.test_cli_runner() + result = runner.invoke(motion_masking, (motion, motionreason, host)) + assert result.exit_code == 0 + self.assertIn("No wildcards allowed for motion entry 'g1.20200402.0\\1'.", result.output) + self.assertEqual(records, self.recordCountLog('motionmasking')) + + motion = 'g1.2020040\.001' + motionreason='http://motiontest.wpia.club/motion/xxx' + host= app.config.get("DEFAULT_HOST") + runner = app.test_cli_runner() + result = runner.invoke(motion_masking, (motion, motionreason, host)) + assert result.exit_code == 0 + self.assertIn("No wildcards allowed for motion entry 'g1.2020040\\.001'.", result.output) + self.assertEqual(records, self.recordCountLog('motionmasking')) + + # test masking single motion + sql = "SELECT id FROM motion WHERE content LIKE $1" + motion = 'g1.20200402.001' + motionreason='http://motiontest.wpia.club/motion/xxx' + runner = app.test_cli_runner() + result = runner.invoke(motion_masking, (motion, motionreason, host)) + assert result.exit_code == 0 + self.assertIn("1 record(s) affected by masking of 'g1.20200402.001'.", result.output) + self.assertIn("1 record(s) updated by masking of 'g1.20200402.001'.", result.output) + records=1 + self.assertEqual(records, self.recordCountLog('motionmasking')) + self.logRecordDetailsTest('motionmasking', records-1, 0, + "1 motion(s) masked on base of motion http://motiontest.wpia.club/motion/xxx with motion identifier 'g1.20200402.001' on host 127.0.0.1:5000", 0) + self.assertEqual(1, self.recordCount(sql, '%' + motionreason +'%')) + + # test masking muliple motions + motion = 'g1.20200402' + motionreason='http://motiontest.wpia.club/motion/1xxx' + runner = app.test_cli_runner() + result = runner.invoke(motion_masking, (motion, motionreason, host)) + assert result.exit_code == 0 + self.assertIn("4 record(s) affected by masking of 'g1.20200402'.", result.output) + self.assertIn("4 record(s) updated by masking of 'g1.20200402'.", result.output) + records=2 + self.assertEqual(records, self.recordCountLog('motionmasking')) + self.logRecordDetailsTest('motionmasking', records-1, 0, + "4 motion(s) masked on base of motion http://motiontest.wpia.club/motion/1xxx with motion identifier 'g1.20200402' on host 127.0.0.1:5000", 0) + self.assertEqual(4, self.recordCount(sql, '%' + motionreason +'%')) + + # test different host + motion = 'g1.20200402.001' + motionreason='http://motiontest.wpia.club/motion/xxx' + host= '127.0.0.1:5001' + runner = app.test_cli_runner() + result = runner.invoke(motion_masking, (motion, motionreason, host)) + assert result.exit_code == 0 + self.assertIn("0 record(s) affected by masking of 'g1.20200402.001'.", result.output) + self.assertEqual(records, self.recordCountLog('motionmasking'))