|
|
|
from datetime import timedelta
|
|
|
|
from django.test import TestCase
|
|
|
|
from django.utils import timezone
|
|
|
|
from django.core.management import call_command
|
|
|
|
from django.core import mail
|
|
|
|
from io import StringIO
|
|
|
|
from constance import config as site_config
|
|
|
|
from constance.test import override_config
|
|
|
|
|
|
|
|
from lambdainst.forms import SignupForm
|
|
|
|
from lambdainst.models import VPNUser, User
|
|
|
|
from payments.models import Payment, Subscription
|
|
|
|
|
|
|
|
|
|
|
|
class UserTestMixin:
|
|
|
|
def assertRemaining(self, vpnuser, time, delta=5):
|
|
|
|
"""Check that the vpnuser will expire in time (+/- 5 seconds)"""
|
|
|
|
exp = vpnuser.expiration or timezone.now()
|
|
|
|
seconds = (exp - timezone.now() - time).total_seconds()
|
|
|
|
self.assertAlmostEqual(seconds, 0, delta=delta)
|
|
|
|
|
|
|
|
|
|
|
|
class UserModelReferrerTest(TestCase, UserTestMixin):
|
|
|
|
def setUp(self):
|
|
|
|
self.referrer = User.objects.create_user("ref")
|
|
|
|
|
|
|
|
self.without_ref = User.objects.create_user("aaaa")
|
|
|
|
|
|
|
|
self.with_ref = User.objects.create_user("bbbb")
|
|
|
|
self.with_ref.vpnuser.referrer = self.referrer
|
|
|
|
|
|
|
|
self.payment = Payment.objects.create(
|
|
|
|
user=self.with_ref, status="confirmed", amount=300, time=timedelta(days=30)
|
|
|
|
)
|
|
|
|
|
|
|
|
def test_no_ref(self):
|
|
|
|
self.without_ref.vpnuser.on_payment_confirmed(self.payment)
|
|
|
|
|
|
|
|
def test_ref(self):
|
|
|
|
self.with_ref.vpnuser.on_payment_confirmed(self.payment)
|
|
|
|
self.assertTrue(self.with_ref.vpnuser.referrer_used)
|
|
|
|
self.assertEqual(self.with_ref.vpnuser.referrer, self.referrer)
|
|
|
|
self.assertRemaining(
|
|
|
|
self.referrer.vpnuser, timedelta(days=30), delta=24 * 3600 * 3
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class SignupViewTest(TestCase):
|
|
|
|
def test_form(self):
|
|
|
|
response = self.client.get("/account/signup")
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
self.assertIsInstance(response.context["form"], SignupForm)
|
|
|
|
|
|
|
|
def test_post(self):
|
|
|
|
response = self.client.post(
|
|
|
|
"/account/signup",
|
|
|
|
{"username": "test_un", "password": "test_pw", "password2": "test_pw"},
|
|
|
|
)
|
|
|
|
self.assertRedirects(response, "/account/")
|
|
|
|
|
|
|
|
user = User.objects.get(username="test_un")
|
|
|
|
self.assertTrue(user.check_password("test_pw"))
|
|
|
|
|
|
|
|
def test_post_error(self):
|
|
|
|
response = self.client.post(
|
|
|
|
"/account/signup",
|
|
|
|
{"username": "test_un", "password": "test_pw", "password2": "qsdf"},
|
|
|
|
)
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
self.assertIsInstance(response.context["form"], SignupForm)
|
|
|
|
self.assertFormError(response, "form", "password", "Passwords are not the same")
|
|
|
|
|
|
|
|
def test_post_referrer(self):
|
|
|
|
ref = User.objects.create_user("referrer")
|
|
|
|
|
|
|
|
response = self.client.post(
|
|
|
|
"/account/signup?ref=%d" % ref.id,
|
|
|
|
{"username": "test_un", "password": "test_pw", "password2": "test_pw"},
|
|
|
|
)
|
|
|
|
self.assertRedirects(response, "/account/")
|
|
|
|
|
|
|
|
user = User.objects.get(username="test_un")
|
|
|
|
self.assertTrue(user.check_password("test_pw"))
|
|
|
|
self.assertEqual(user.vpnuser.referrer, ref)
|
|
|
|
|
|
|
|
|
|
|
|
class AccountViewsTest(TestCase, UserTestMixin):
|
|
|
|
def setUp(self):
|
|
|
|
User.objects.create_user("test", None, "test_pw")
|
|
|
|
self.client.login(username="test", password="test_pw")
|
|
|
|
|
|
|
|
def test_account(self):
|
|
|
|
response = self.client.get("/account/")
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
|
|
|
|
def test_settings_form(self):
|
|
|
|
response = self.client.get("/account/settings")
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
|
|
|
|
def print_message(self, response):
|
|
|
|
from django.contrib.messages import get_messages
|
|
|
|
|
|
|
|
messages = list(get_messages(response.wsgi_request))
|
|
|
|
for m in messages:
|
|
|
|
print(f"[message: {m.message!r} level={m.level} tags={m.tags!r}]")
|
|
|
|
|
|
|
|
def test_settings_post_email(self):
|
|
|
|
response = self.client.post(
|
|
|
|
"/account/settings",
|
|
|
|
{
|
|
|
|
"action": "email",
|
|
|
|
"current_password": "test_pw",
|
|
|
|
"email": "new_email@example.com",
|
|
|
|
},
|
|
|
|
)
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
|
|
|
|
|
|
user = User.objects.get(username="test")
|
|
|
|
self.assertEqual(user.email, "new_email@example.com")
|
|
|
|
|
|
|
|
def test_settings_post_email_fail(self):
|
|
|
|
response = self.client.post(
|
|
|
|
"/account/settings",
|
|
|
|
{
|
|
|
|
"action": "email",
|
|
|
|
"current_password": "not_test_pw",
|
|
|
|
"email": "new_email@example.com",
|
|
|
|
},
|
|
|
|
)
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
|
|
|
|
|
|
user = User.objects.get(username="test")
|
|
|
|
self.assertNotEqual(user.email, "new_email@example.com")
|
|
|
|
|
|
|
|
def test_settings_post_pw(self):
|
|
|
|
response = self.client.post(
|
|
|
|
"/account/settings",
|
|
|
|
{
|
|
|
|
"action": "password",
|
|
|
|
"current_password": "test_pw",
|
|
|
|
"password": "new_test_pw",
|
|
|
|
"password2": "new_test_pw",
|
|
|
|
},
|
|
|
|
)
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
|
|
|
|
|
|
user = User.objects.get(username="test")
|
|
|
|
self.assertTrue(user.check_password("new_test_pw"))
|
|
|
|
|
|
|
|
def test_settings_post_pw_fail(self):
|
|
|
|
response = self.client.post(
|
|
|
|
"/account/settings",
|
|
|
|
{
|
|
|
|
"action": "password",
|
|
|
|
"current_password": "oops",
|
|
|
|
"password": "new_test_pw",
|
|
|
|
"password2": "new_test_pw",
|
|
|
|
},
|
|
|
|
)
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
|
|
|
|
|
|
response = self.client.post(
|
|
|
|
"/account/settings",
|
|
|
|
{
|
|
|
|
"action": "password",
|
|
|
|
"current_password": "test_pw",
|
|
|
|
"password": "new_test_pw2",
|
|
|
|
"password2": "new_test_pw_qsdfg",
|
|
|
|
},
|
|
|
|
)
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
|
|
|
|
|
|
user = User.objects.get(username="test")
|
|
|
|
self.assertFalse(user.check_password("new_test_pw"))
|
|
|
|
self.assertFalse(user.check_password("new_test_pw2"))
|
|
|
|
self.assertTrue(user.check_password("test_pw"))
|
|
|
|
|
|
|
|
|
|
|
|
class CACrtViewTest(TestCase):
|
|
|
|
def test_ca_crt(self):
|
|
|
|
with self.settings(OPENVPN_CA="test ca"):
|
|
|
|
response = self.client.get("/ca.crt")
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
self.assertEqual(response["Content-Type"], "application/x-x509-ca-cert")
|
|
|
|
self.assertEqual(response.content, b"test ca")
|
|
|
|
|
|
|
|
|
|
|
|
def email_text(body):
|
|
|
|
return body.replace("\n", " ").replace("\xa0", " ") # nbsp
|
|
|
|
|
|
|
|
|
|
|
|
class ExpireNotifyTest(TestCase):
|
|
|
|
def setUp(self):
|
|
|
|
pass
|
|
|
|
|
|
|
|
def test_notify_first(self):
|
|
|
|
out = StringIO()
|
|
|
|
u = User.objects.create_user("test_username", "test@example.com", "testpw")
|
|
|
|
u.vpnuser.add_paid_time(timedelta(days=2), "initial")
|
|
|
|
u.vpnuser.save()
|
|
|
|
|
|
|
|
call_command("expire_notify", stdout=out)
|
|
|
|
self.assertEqual(len(mail.outbox), 1)
|
|
|
|
self.assertEqual(mail.outbox[0].to, ["test@example.com"])
|
|
|
|
self.assertIn("expire in 1 day", email_text(mail.outbox[0].body))
|
|
|
|
|
|
|
|
u = User.objects.get(username="test_username")
|
|
|
|
self.assertAlmostEqual(
|
|
|
|
u.vpnuser.last_expiry_notice, timezone.now(), delta=timedelta(minutes=1)
|
|
|
|
)
|
|
|
|
|
|
|
|
def test_notify_second(self):
|
|
|
|
out = StringIO()
|
|
|
|
u = User.objects.create_user("test_username", "test@example.com", "testpw")
|
|
|
|
u.vpnuser.last_expiry_notice = timezone.now() - timedelta(days=2)
|
|
|
|
u.vpnuser.add_paid_time(timedelta(days=1), "initial")
|
|
|
|
u.vpnuser.save()
|
|
|
|
|
|
|
|
call_command("expire_notify", stdout=out)
|
|
|
|
self.assertEqual(len(mail.outbox), 1)
|
|
|
|
self.assertEqual(mail.outbox[0].to, ["test@example.com"])
|
|
|
|
self.assertIn("expire in 23 hours, 59 minutes", email_text(mail.outbox[0].body))
|
|
|
|
|
|
|
|
u = User.objects.get(username="test_username")
|
|
|
|
self.assertAlmostEqual(
|
|
|
|
u.vpnuser.last_expiry_notice, timezone.now(), delta=timedelta(minutes=1)
|
|
|
|
)
|
|
|
|
|
|
|
|
def test_notify_subscription(self):
|
|
|
|
out = StringIO()
|
|
|
|
u = User.objects.create_user("test_username", "test@example.com", "testpw")
|
|
|
|
u.vpnuser.add_paid_time(timedelta(days=2), "initial")
|
|
|
|
u.vpnuser.save()
|
|
|
|
|
|
|
|
s = Subscription(user=u, backend_id="paypal", status="active")
|
|
|
|
s.save()
|
|
|
|
|
|
|
|
call_command("expire_notify", stdout=out)
|
|
|
|
self.assertEqual(len(mail.outbox), 0)
|
|
|
|
|
|
|
|
u = User.objects.get(username="test_username")
|
|
|
|
# FIXME:
|
|
|
|
# self.assertNotAlmostEqual(u.vpnuser.last_expiry_notice, timezone.now(),
|
|
|
|
# delta=timedelta(minutes=1))
|
|
|
|
|
|
|
|
def test_notify_subscription_new(self):
|
|
|
|
out = StringIO()
|
|
|
|
u = User.objects.create_user("test_username", "test@example.com", "testpw")
|
|
|
|
u.vpnuser.add_paid_time(timedelta(days=2), "initial")
|
|
|
|
u.vpnuser.last_expiry_notice = timezone.now() - timedelta(days=5)
|
|
|
|
u.vpnuser.save()
|
|
|
|
|
|
|
|
s = Subscription(user=u, backend_id="paypal", status="new")
|
|
|
|
s.save()
|
|
|
|
|
|
|
|
call_command("expire_notify", stdout=out)
|
|
|
|
self.assertEqual(len(mail.outbox), 1)
|
|
|
|
|
|
|
|
u = User.objects.get(username="test_username")
|
|
|
|
# FIXME:
|
|
|
|
# self.assertNotAlmostEqual(u.vpnuser.last_expiry_notice, timezone.now(),
|
|
|
|
# delta=timedelta(minutes=1))
|