391 lines
12 KiB
Python
391 lines
12 KiB
Python
"""
|
|
Environment generation module - replaces generate-env.sh
|
|
|
|
Provides pure Python implementations for:
|
|
- Random word selection from dictionary
|
|
- Memorable password generation
|
|
- Environment file generation and manipulation
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import random
|
|
import re
|
|
import secrets
|
|
import shutil
|
|
from dataclasses import asdict, dataclass
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class EnvValues:
|
|
"""Container for generated environment values"""
|
|
subdomain: str
|
|
domain: str
|
|
url: str
|
|
db_name: str
|
|
db_user: str
|
|
db_password: str
|
|
compose_project_name: str
|
|
|
|
|
|
class WordGenerator:
|
|
"""Pure Python implementation of dictionary word selection"""
|
|
|
|
def __init__(self, dict_file: Path):
|
|
"""
|
|
Initialize word generator
|
|
|
|
Args:
|
|
dict_file: Path to dictionary file (e.g., /usr/share/dict/words)
|
|
"""
|
|
self._dict_file = dict_file
|
|
self._words_cache: Optional[List[str]] = None
|
|
self._logger = logging.getLogger(f"{__name__}.WordGenerator")
|
|
|
|
def _load_and_filter_words(self) -> List[str]:
|
|
"""
|
|
Load dictionary and filter to 4-10 char lowercase words
|
|
|
|
Returns:
|
|
List of filtered words
|
|
|
|
Raises:
|
|
FileNotFoundError: If dictionary file doesn't exist
|
|
ValueError: If no valid words found
|
|
"""
|
|
if not self._dict_file.exists():
|
|
raise FileNotFoundError(f"Dictionary file not found: {self._dict_file}")
|
|
|
|
self._logger.debug(f"Loading words from {self._dict_file}")
|
|
|
|
# Read and filter words matching pattern: ^[a-z]{4,10}$
|
|
pattern = re.compile(r'^[a-z]{4,10}$')
|
|
words = []
|
|
|
|
with open(self._dict_file, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
word = line.strip()
|
|
if pattern.match(word):
|
|
words.append(word)
|
|
|
|
if not words:
|
|
raise ValueError(f"No valid words found in {self._dict_file}")
|
|
|
|
self._logger.debug(f"Loaded {len(words)} valid words")
|
|
return words
|
|
|
|
def get_random_word(self) -> str:
|
|
"""
|
|
Get single random word from filtered list
|
|
|
|
Returns:
|
|
Random word (4-10 chars, lowercase)
|
|
"""
|
|
# Load and cache words on first use
|
|
if self._words_cache is None:
|
|
self._words_cache = self._load_and_filter_words()
|
|
|
|
return random.choice(self._words_cache)
|
|
|
|
def get_random_words(self, count: int) -> List[str]:
|
|
"""
|
|
Get multiple random words efficiently
|
|
|
|
Args:
|
|
count: Number of words to retrieve
|
|
|
|
Returns:
|
|
List of random words
|
|
"""
|
|
# Load and cache words on first use
|
|
if self._words_cache is None:
|
|
self._words_cache = self._load_and_filter_words()
|
|
|
|
return random.choices(self._words_cache, k=count)
|
|
|
|
|
|
class PasswordGenerator:
|
|
"""Generate memorable passwords from dictionary words"""
|
|
|
|
def __init__(self, word_generator: WordGenerator):
|
|
"""
|
|
Initialize password generator
|
|
|
|
Args:
|
|
word_generator: WordGenerator instance for word selection
|
|
"""
|
|
self._word_generator = word_generator
|
|
self._logger = logging.getLogger(f"{__name__}.PasswordGenerator")
|
|
|
|
def generate_memorable_password(self, word_count: int = 3) -> str:
|
|
"""
|
|
Generate password from N random nouns joined by hyphens
|
|
|
|
Args:
|
|
word_count: Number of words to use (default: 3)
|
|
|
|
Returns:
|
|
Password string like "templon-infantly-yielding"
|
|
"""
|
|
words = self._word_generator.get_random_words(word_count)
|
|
password = '-'.join(words)
|
|
self._logger.debug(f"Generated {word_count}-word password")
|
|
return password
|
|
|
|
def generate_random_string(self, length: int = 8) -> str:
|
|
"""
|
|
Generate alphanumeric random string using secrets module
|
|
|
|
Args:
|
|
length: Length of string to generate (default: 8)
|
|
|
|
Returns:
|
|
Random alphanumeric string
|
|
"""
|
|
# Use secrets for cryptographically secure random generation
|
|
# Generate hex and convert to lowercase alphanumeric
|
|
return secrets.token_hex(length // 2 + 1)[:length]
|
|
|
|
|
|
class EnvFileGenerator:
|
|
"""Pure Python .env file manipulation (replaces bash sed logic)"""
|
|
|
|
def __init__(
|
|
self,
|
|
env_file: Path,
|
|
word_generator: WordGenerator,
|
|
password_generator: PasswordGenerator,
|
|
base_domain: str = "merakit.my",
|
|
app_name: Optional[str] = None
|
|
):
|
|
"""
|
|
Initialize environment file generator
|
|
|
|
Args:
|
|
env_file: Path to .env file
|
|
word_generator: WordGenerator instance
|
|
password_generator: PasswordGenerator instance
|
|
base_domain: Base domain for URL generation (default: "merakit.my")
|
|
app_name: Application name (default: read from .env or "gitea")
|
|
"""
|
|
self._env_file = env_file
|
|
self._word_generator = word_generator
|
|
self._password_generator = password_generator
|
|
self._base_domain = base_domain
|
|
self._app_name = app_name
|
|
self._logger = logging.getLogger(f"{__name__}.EnvFileGenerator")
|
|
|
|
def generate_values(self) -> EnvValues:
|
|
"""
|
|
Generate all environment values
|
|
|
|
Returns:
|
|
EnvValues dataclass with all generated values
|
|
"""
|
|
self._logger.info("Generating environment values")
|
|
|
|
# Read current .env to get app_name if not provided
|
|
current_env = self.read_current_env()
|
|
app_name = self._app_name or current_env.get('APP_NAME', 'gitea')
|
|
|
|
# 1. Generate subdomain: two random words
|
|
word1 = self._word_generator.get_random_word()
|
|
word2 = self._word_generator.get_random_word()
|
|
subdomain = f"{word1}-{word2}"
|
|
|
|
# 2. Construct URL
|
|
url = f"{subdomain}.{self._base_domain}"
|
|
|
|
# 3. Generate random string for DB identifiers
|
|
random_str = self._password_generator.generate_random_string(8)
|
|
|
|
# 4. Generate DB identifiers with truncation logic
|
|
db_name = self._generate_db_name(random_str, app_name, subdomain)
|
|
db_user = self._generate_db_user(random_str, app_name, subdomain)
|
|
|
|
# 5. Generate password
|
|
db_password = self._password_generator.generate_memorable_password(3)
|
|
|
|
self._logger.info(f"Generated values for subdomain: {subdomain}")
|
|
self._logger.debug(f"URL: {url}")
|
|
self._logger.debug(f"DB_NAME: {db_name}")
|
|
self._logger.debug(f"DB_USER: {db_user}")
|
|
|
|
return EnvValues(
|
|
subdomain=subdomain,
|
|
domain=self._base_domain,
|
|
url=url,
|
|
db_name=db_name,
|
|
db_user=db_user,
|
|
db_password=db_password,
|
|
compose_project_name=subdomain
|
|
)
|
|
|
|
def _generate_db_name(self, random_str: str, app_name: str, subdomain: str) -> str:
|
|
"""
|
|
Format: angali_{random8}_{app}_{subdomain}, truncate to 64 chars
|
|
|
|
Args:
|
|
random_str: Random 8-char string
|
|
app_name: Application name
|
|
subdomain: Subdomain with hyphens
|
|
|
|
Returns:
|
|
Database name (max 64 chars)
|
|
"""
|
|
# Replace hyphens with underscores for DB compatibility
|
|
subdomain_safe = subdomain.replace('-', '_')
|
|
db_name = f"angali_{random_str}_{app_name}_{subdomain_safe}"
|
|
|
|
# Truncate to PostgreSQL limit of 63 chars (64 - 1 for null terminator)
|
|
return db_name[:63]
|
|
|
|
def _generate_db_user(self, random_str: str, app_name: str, subdomain: str) -> str:
|
|
"""
|
|
Format: angali_{random8}_{app}_{subdomain}, truncate to 63 chars
|
|
|
|
Args:
|
|
random_str: Random 8-char string
|
|
app_name: Application name
|
|
subdomain: Subdomain with hyphens
|
|
|
|
Returns:
|
|
Database username (max 63 chars)
|
|
"""
|
|
# Replace hyphens with underscores for DB compatibility
|
|
subdomain_safe = subdomain.replace('-', '_')
|
|
db_user = f"angali_{random_str}_{app_name}_{subdomain_safe}"
|
|
|
|
# Truncate to PostgreSQL limit of 63 chars
|
|
return db_user[:63]
|
|
|
|
def read_current_env(self) -> Dict[str, str]:
|
|
"""
|
|
Parse existing .env file into dict
|
|
|
|
Returns:
|
|
Dictionary of environment variables
|
|
"""
|
|
env_dict = {}
|
|
|
|
if not self._env_file.exists():
|
|
self._logger.warning(f"Env file not found: {self._env_file}")
|
|
return env_dict
|
|
|
|
with open(self._env_file, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
# Skip empty lines and comments
|
|
if not line or line.startswith('#'):
|
|
continue
|
|
|
|
# Parse KEY=VALUE format
|
|
if '=' in line:
|
|
key, value = line.split('=', 1)
|
|
# Remove quotes if present
|
|
value = value.strip('"').strip("'")
|
|
env_dict[key.strip()] = value
|
|
|
|
self._logger.debug(f"Read {len(env_dict)} variables from {self._env_file}")
|
|
return env_dict
|
|
|
|
def backup_env_file(self) -> Path:
|
|
"""
|
|
Create timestamped backup of .env file
|
|
|
|
Returns:
|
|
Path to backup file
|
|
|
|
Raises:
|
|
FileNotFoundError: If .env file doesn't exist
|
|
"""
|
|
if not self._env_file.exists():
|
|
raise FileNotFoundError(f"Cannot backup non-existent file: {self._env_file}")
|
|
|
|
# Create backup with timestamp
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
backup_path = self._env_file.parent / f"{self._env_file.name}.backup.{timestamp}"
|
|
|
|
shutil.copy2(self._env_file, backup_path)
|
|
self._logger.info(f"Created backup: {backup_path}")
|
|
|
|
return backup_path
|
|
|
|
def update_env_file(self, values: EnvValues, dry_run: bool = False) -> None:
|
|
"""
|
|
Update .env file with new values (Python dict manipulation)
|
|
|
|
Uses atomic write pattern: write to temp file, then rename
|
|
|
|
Args:
|
|
values: EnvValues to write
|
|
dry_run: If True, only log what would be done
|
|
|
|
Raises:
|
|
FileNotFoundError: If .env file doesn't exist
|
|
"""
|
|
if not self._env_file.exists():
|
|
raise FileNotFoundError(f"Env file not found: {self._env_file}")
|
|
|
|
if dry_run:
|
|
self._logger.info(f"[DRY-RUN] Would update {self._env_file} with:")
|
|
for key, value in asdict(values).items():
|
|
if 'password' in key.lower():
|
|
self._logger.info(f" {key.upper()}=********")
|
|
else:
|
|
self._logger.info(f" {key.upper()}={value}")
|
|
return
|
|
|
|
# Read current env
|
|
current_env = self.read_current_env()
|
|
|
|
# Update with new values
|
|
current_env.update({
|
|
'COMPOSE_PROJECT_NAME': values.compose_project_name,
|
|
'SUBDOMAIN': values.subdomain,
|
|
'DOMAIN': values.domain,
|
|
'URL': values.url,
|
|
'DB_NAME': values.db_name,
|
|
'DB_USER': values.db_user,
|
|
'DB_PASSWORD': values.db_password
|
|
})
|
|
|
|
# Write atomically: write to temp file, then rename
|
|
temp_file = self._env_file.parent / f"{self._env_file.name}.tmp"
|
|
|
|
try:
|
|
with open(temp_file, 'w') as f:
|
|
for key, value in current_env.items():
|
|
f.write(f"{key}={value}\n")
|
|
|
|
# Atomic rename
|
|
os.replace(temp_file, self._env_file)
|
|
self._logger.info(f"Updated {self._env_file} successfully")
|
|
|
|
except Exception as e:
|
|
# Cleanup temp file on error
|
|
if temp_file.exists():
|
|
temp_file.unlink()
|
|
raise RuntimeError(f"Failed to update env file: {e}") from e
|
|
|
|
def restore_env_file(self, backup_path: Path) -> None:
|
|
"""
|
|
Restore .env from backup
|
|
|
|
Args:
|
|
backup_path: Path to backup file
|
|
|
|
Raises:
|
|
FileNotFoundError: If backup file doesn't exist
|
|
"""
|
|
if not backup_path.exists():
|
|
raise FileNotFoundError(f"Backup file not found: {backup_path}")
|
|
|
|
shutil.copy2(backup_path, self._env_file)
|
|
self._logger.info(f"Restored {self._env_file} from {backup_path}")
|