Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
231 lines
7.8 KiB
Python
231 lines
7.8 KiB
Python
import itertools
|
||
import os
|
||
import time
|
||
|
||
from datetime import datetime
|
||
from django.conf import settings
|
||
from django.contrib.staticfiles.testing import StaticLiveServerTestCase
|
||
from channels.testing import ChannelsLiveServerTestCase
|
||
from pathlib import Path
|
||
from selenium import webdriver
|
||
from selenium.common.exceptions import WebDriverException
|
||
from selenium.webdriver.common.by import By
|
||
|
||
from .container_commands import create_session_on_server, reset_database
|
||
from .management.commands.create_session import create_pre_authenticated_session
|
||
from apps.applets.models import Applet
|
||
|
||
|
||
|
||
MAX_WAIT = 10
|
||
SCREEN_DUMP_LOCATION = Path(__file__).absolute().parent / "screendumps"
|
||
|
||
|
||
# Decorator fns
|
||
def wait(fn):
|
||
def modified_fn(*args, **kwargs):
|
||
start_time = time.time()
|
||
while True:
|
||
try:
|
||
return fn(*args, **kwargs)
|
||
except (AssertionError, WebDriverException) as e:
|
||
if time.time() - start_time > MAX_WAIT:
|
||
raise e
|
||
time.sleep(0.5)
|
||
return modified_fn
|
||
|
||
# Functional Tests
|
||
class FunctionalTest(StaticLiveServerTestCase):
|
||
# Helper methods
|
||
def _make_browser(self, width=1366, height=900):
|
||
"""Create a Firefox instance sized to width×height."""
|
||
options = webdriver.FirefoxOptions()
|
||
if os.environ.get("HEADLESS"):
|
||
options.add_argument("--headless")
|
||
browser = webdriver.Firefox(options=options)
|
||
browser.set_window_size(width, height)
|
||
return browser
|
||
|
||
def setUp(self):
|
||
self.browser = self._make_browser(1366, 900)
|
||
self.test_server = os.environ.get("TEST_SERVER")
|
||
if self.test_server:
|
||
self.live_server_url = 'http://' + self.test_server
|
||
reset_database(self.test_server)
|
||
Applet.objects.get_or_create(slug="new-note", defaults={"name": "New Note"})
|
||
|
||
def tearDown(self):
|
||
if self._test_has_failed():
|
||
if not SCREEN_DUMP_LOCATION.exists():
|
||
SCREEN_DUMP_LOCATION.mkdir(parents=True)
|
||
self.take_screenshot()
|
||
self.dump_html()
|
||
self.browser.quit()
|
||
super().tearDown()
|
||
|
||
def _test_has_failed(self):
|
||
try:
|
||
return any(
|
||
failure[0] == self
|
||
for failure in itertools.chain(
|
||
self._outcome.result.failures, self._outcome.result.errors
|
||
)
|
||
)
|
||
except TypeError:
|
||
return False
|
||
|
||
def take_screenshot(self):
|
||
path = SCREEN_DUMP_LOCATION / self._get_filename("png")
|
||
print("screendumping to", path)
|
||
self.browser.get_screenshot_as_file(str(path))
|
||
|
||
def dump_html(self):
|
||
path = SCREEN_DUMP_LOCATION / self._get_filename("html")
|
||
print("dumping page html to", path)
|
||
path.write_text(self.browser.page_source, encoding="utf-8")
|
||
|
||
def _get_filename(self, extension):
|
||
timestamp = datetime.now().isoformat().replace(":", ".")
|
||
return (
|
||
f"{self.__class__.__name__}.{self._testMethodName}-{timestamp}.{extension}"
|
||
)
|
||
|
||
|
||
@wait
|
||
def wait_for(self, fn):
|
||
return fn()
|
||
|
||
def wait_for_slow(self, fn, timeout=30):
|
||
start_time = time.time()
|
||
while True:
|
||
try:
|
||
return fn()
|
||
except (AssertionError, WebDriverException) as e:
|
||
if time.time() - start_time > timeout:
|
||
raise e
|
||
time.sleep(0.5)
|
||
|
||
def create_pre_authenticated_session(self, email):
|
||
if self.test_server:
|
||
session_key = create_session_on_server(self.test_server, email)
|
||
else:
|
||
session_key = create_pre_authenticated_session(email)
|
||
## to set a cookie we need to first visit the domain
|
||
## 404 pages load the quickest!
|
||
self.browser.get(self.live_server_url + "/404_no_such_url/")
|
||
self.browser.add_cookie(
|
||
dict(
|
||
name=settings.SESSION_COOKIE_NAME,
|
||
value=session_key,
|
||
path="/",
|
||
)
|
||
)
|
||
|
||
def confirm_guard(self, browser=None):
|
||
b = browser or self.browser
|
||
def _click():
|
||
btn = b.find_element(By.CSS_SELECTOR, "#id_guard_portal.active .guard-yes")
|
||
b.execute_script("arguments[0].click()", btn)
|
||
self.wait_for(_click)
|
||
|
||
@wait
|
||
def wait_to_be_logged_in(self, email):
|
||
self.browser.find_element(By.CSS_SELECTOR, "#id_logout"),
|
||
navbar = self.browser.find_element(By.CSS_SELECTOR, ".navbar")
|
||
self.assertIn(email, navbar.text)
|
||
|
||
@wait
|
||
def wait_to_be_logged_out(self, email):
|
||
lambda: self.browser.find_element(By.CSS_SELECTOR, "input[name=email]"),
|
||
navbar = self.browser.find_element(By.CSS_SELECTOR, ".navbar")
|
||
self.assertNotIn(email, navbar.text)
|
||
|
||
|
||
class ChannelsFunctionalTest(ChannelsLiveServerTestCase):
|
||
"""Like FunctionalTest but backed by daphne so WebSocket connections work."""
|
||
serve_static = True
|
||
|
||
def setUp(self):
|
||
options = webdriver.FirefoxOptions()
|
||
headless = os.environ.get("HEADLESS")
|
||
if headless:
|
||
options.add_argument("--headless")
|
||
self.browser = webdriver.Firefox(options=options)
|
||
self.browser.set_window_size(1366, 900)
|
||
self.test_server = os.environ.get("TEST_SERVER")
|
||
if self.test_server:
|
||
self.live_server_url = 'http://' + self.test_server
|
||
reset_database(self.test_server)
|
||
Applet.objects.get_or_create(slug="new-note", defaults={"name": "New Note"})
|
||
|
||
def tearDown(self):
|
||
if self._test_has_failed():
|
||
if not SCREEN_DUMP_LOCATION.exists():
|
||
SCREEN_DUMP_LOCATION.mkdir(parents=True)
|
||
self.take_screenshot()
|
||
self.dump_html()
|
||
self.browser.quit()
|
||
super().tearDown()
|
||
|
||
def _test_has_failed(self):
|
||
try:
|
||
return any(
|
||
failure[0] == self
|
||
for failure in itertools.chain(
|
||
self._outcome.result.failures, self._outcome.result.errors
|
||
)
|
||
)
|
||
except TypeError:
|
||
return False
|
||
|
||
def take_screenshot(self):
|
||
path = SCREEN_DUMP_LOCATION / self._get_filename("png")
|
||
print("screendumping to", path)
|
||
self.browser.get_screenshot_as_file(str(path))
|
||
|
||
def dump_html(self):
|
||
path = SCREEN_DUMP_LOCATION / self._get_filename("html")
|
||
print("dumping page html to", path)
|
||
path.write_text(self.browser.page_source, encoding="utf-8")
|
||
|
||
def _get_filename(self, extension):
|
||
timestamp = datetime.now().isoformat().replace(":", ".")
|
||
return (
|
||
f"{self.__class__.__name__}.{self._testMethodName}-{timestamp}.{extension}"
|
||
)
|
||
|
||
@wait
|
||
def wait_for(self, fn, browser=None):
|
||
return fn()
|
||
|
||
def wait_for_slow(self, fn, timeout=30, browser=None):
|
||
start_time = time.time()
|
||
while True:
|
||
try:
|
||
return fn()
|
||
except (AssertionError, WebDriverException) as e:
|
||
if time.time() - start_time > timeout:
|
||
raise e
|
||
time.sleep(0.5)
|
||
|
||
def confirm_guard(self, browser=None):
|
||
b = browser or self.browser
|
||
def _click():
|
||
btn = b.find_element(By.CSS_SELECTOR, "#id_guard_portal.active .guard-yes")
|
||
b.execute_script("arguments[0].click()", btn)
|
||
self.wait_for(_click)
|
||
|
||
def create_pre_authenticated_session(self, email):
|
||
if self.test_server:
|
||
session_key = create_session_on_server(self.test_server, email)
|
||
else:
|
||
session_key = create_pre_authenticated_session(email)
|
||
self.browser.get(self.live_server_url + "/404_no_such_url/")
|
||
self.browser.add_cookie(
|
||
dict(
|
||
name=settings.SESSION_COOKIE_NAME,
|
||
value=session_key,
|
||
path="/",
|
||
)
|
||
)
|