Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ dependencies = [
"sqlalchemy>=1.2.12,<2.0",
"alembic~=1.13",
"urllib3>=1.26",
"rich>=14.3.3",
]

[project.optional-dependencies]
Expand Down
93 changes: 79 additions & 14 deletions src/simdb/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@

import appdirs
import sqlalchemy.orm
from alembic.config import Config as AlembicConfig
from alembic.migration import MigrationContext
from alembic.operations import Operations
from alembic.script import ScriptDirectory
from rich.prompt import Confirm
from sqlalchemy import String, Text, asc, create_engine, desc, func, or_
from sqlalchemy import cast as sql_cast
from sqlalchemy import or_ as sql_or
Expand All @@ -23,11 +28,72 @@
from .models.metadata import MetaData
from .models.simulation import Simulation

_ALEMBIC_INI = Path("alembic.ini")


class DatabaseError(RuntimeError):
pass


class DatabaseUninitializedError(DatabaseError):
pass


class DatabaseOutdatedError(DatabaseError):
pass


def check_migrations(engine) -> str:
"""Check that the database is up-to-date with the latest Alembic migration.

Raises :class:`DatabaseUninitializedError` if the database has not been
initialised at all (i.e. the ``alembic_version`` table is absent), or
:class:`DatabaseOutdatedError` if the database schema is behind the head
revision.
"""
alembic_cfg = AlembicConfig(str(_ALEMBIC_INI))
script = ScriptDirectory.from_config(alembic_cfg)
head_revision = script.get_current_head()

with engine.connect() as conn:
context = MigrationContext.configure(conn)
current_revision = context.get_current_revision()

if current_revision is None:
raise DatabaseUninitializedError(
"The database has not been initialised. "
f"Run 'DATABASE_URL={engine.url} alembic upgrade head' before starting the "
"server. "
)

if current_revision != head_revision:
raise DatabaseOutdatedError(
f"Database schema is out of date: current revision is {current_revision}, "
f"but the latest revision is {head_revision}. "
f"Run 'DATABASE_URL={engine.url} alembic upgrade head' to apply pending "
"migrations. "
)
return current_revision


def run_migrations(engine) -> None:
"""Run the database migrations."""
config = AlembicConfig(_ALEMBIC_INI)
config.set_main_option("script_location", "alembic")
script = ScriptDirectory.from_config(config)

def upgrade(rev, context):
return script._upgrade_revs("head", rev)

with engine.connect() as conn:
context = MigrationContext.configure(
conn, opts={"target_metadata": Base.metadata, "fn": upgrade}
)

with context.begin_transaction(), Operations.context(context):
context.run_migrations()


TYPING = TYPE_CHECKING or "sphinx" in sys.modules

if TYPING:
Expand Down Expand Up @@ -106,12 +172,6 @@ def __init__(self, db_type: DBMS, scopefunc=None, **kwargs) -> None:
self.engine: sqlalchemy.engine.Engine = create_engine(
"sqlite:///{file}".format(**kwargs)
)
with contextlib.closing(self.engine.connect()) as con:
res: sqlalchemy.engine.ResultProxy = con.execute(
"SELECT name FROM sqlite_master WHERE type = 'table' AND name NOT "
"LIKE 'sqlite_%';"
)
new_db = res.rowcount == -1

elif db_type == Database.DBMS.POSTGRESQL:
if "host" not in kwargs:
Expand All @@ -131,11 +191,6 @@ def __init__(self, db_type: DBMS, scopefunc=None, **kwargs) -> None:
pool_pre_ping=True,
pool_recycle=3600,
)
with contextlib.closing(self.engine.connect()) as con:
res: sqlalchemy.engine.ResultProxy = con.execute(
"SELECT * FROM pg_catalog.pg_tables WHERE schemaname = 'public';"
)
new_db = res.rowcount == 0

elif db_type == Database.DBMS.MSSQL:
if "user" not in kwargs:
Expand All @@ -147,12 +202,10 @@ def __init__(self, db_type: DBMS, scopefunc=None, **kwargs) -> None:
self.engine: sqlalchemy.engine.Engine = create_engine(
"mssql+pyodbc://{user}:{password}@{dsnname}".format(**kwargs)
)
new_db = False

else:
raise ValueError("Unknown database type: " + db_type.name)
if new_db:
Base.metadata.create_all(self.engine)

Base.metadata.bind = self.engine
if scopefunc is None:

Expand Down Expand Up @@ -744,4 +797,16 @@ def get_local_db(config: Config) -> Database:
)
db_file.parent.mkdir(parents=True, exist_ok=True)
database = Database(Database.DBMS.SQLITE, file=db_file)
try:
check_migrations(database.engine)
except DatabaseUninitializedError as e:
if Confirm.ask("Local database has not been initialized. Initialize now?"):
run_migrations(database.engine)
else:
raise e
except DatabaseOutdatedError as e:
if Confirm.ask("Local database schema is out of date. Run migrations now?"):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you recommend users create a backup of the existing database (or even create it automatically for them) in case the migration fails?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would not be easy to find database files and restore (if something went wrong) for every user. I would suggest to create database backup automatically and incase if migration fails should be able to rollback.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also good to inform the user is what is backed up and where it stored.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alembic automatically executes the upgrade() function within a single database transaction. If the migration fails at any point, the database engine will immediately roll back the transaction.

The schema will remain safely on the previous revision, and the alembic_version table won't update. Once we fix whatever caused the failure, the user should be able to safely rerun the migration.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to show the users where we've stored the backups :)

run_migrations(database.engine)
else:
raise e
return database
74 changes: 3 additions & 71 deletions src/simdb/remote/app.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
import logging
import os
from pathlib import Path
from typing import Optional, Type, cast

from alembic.config import Config as AlembicConfig
from alembic.migration import MigrationContext
from alembic.operations import Operations
from alembic.script import ScriptDirectory
from flask import Flask, jsonify, request
from flask.json import JSONDecoder, JSONEncoder
from flask_compress import Compress
from flask_cors import CORS

from simdb.config import Config
from simdb.database.models import Base
from simdb.database.database import check_migrations, run_migrations
from simdb.json import CustomDecoder, CustomEncoder

from .apis import blueprints
Expand All @@ -24,66 +19,6 @@
compress = Compress()


# Path to alembic.ini, located at the project root (two levels above this file's
# package: src/simdb/remote/ -> src/simdb/ -> src/ -> project root)
_ALEMBIC_INI = Path("alembic.ini")


def check_migrations(app: "SimDBApp") -> None:
"""Check that the database is up-to-date with the latest Alembic migration.

Logs a warning if the database is behind the head revision, and raises a
:class:`RuntimeError` if the database has not been initialised at all (i.e.
the ``alembic_version`` table is absent).
"""
alembic_cfg = AlembicConfig(str(_ALEMBIC_INI))
script = ScriptDirectory.from_config(alembic_cfg)
head_revision = script.get_current_head()

engine = app.db.engine
with engine.connect() as conn:
context = MigrationContext.configure(conn)
current_revision = context.get_current_revision()

if current_revision is None:
raise RuntimeError(
"The database has not been initialised. "
f"Run 'DATABASE_URL={engine.url} alembic upgrade head' before starting the "
"server. "
)

if current_revision != head_revision:
raise RuntimeError(
f"Database schema is out of date: current revision is {current_revision}, "
f"but the latest revision is {head_revision}. "
f"Run 'DATABASE_URL={engine.url} alembic upgrade head' to apply pending "
"migrations. "
)
else:
app.logger.info(
"Database schema is up to date (revision %s).", current_revision
)


def run_migrations(app: "SimDBApp") -> None:
"""Run the database migrations."""
config = AlembicConfig(_ALEMBIC_INI)
config.set_main_option("script_location", "alembic")
script = ScriptDirectory.from_config(config)

def upgrade(rev, context):
return script._upgrade_revs("head", rev)

engine = app.db.engine
with engine.connect() as conn:
context = MigrationContext.configure(
conn, opts={"target_metadata": Base.metadata, "fn": upgrade}
)

with context.begin_transaction(), Operations.context(context):
context.run_migrations()


def create_app(
config: Optional[Config] = None, testing=False, debug=False, profile=False
):
Expand Down Expand Up @@ -132,14 +67,11 @@ def index():
for version, blueprint in blueprints.items():
app.register_blueprint(blueprint, url_prefix=f"/{version}")

if not _ALEMBIC_INI.exists():
raise RuntimeError(f"Alembic configuration not found at {_ALEMBIC_INI}.")

if testing:
run_migrations(app)
run_migrations(app.db.engine)

try:
check_migrations(app)
check_migrations(app.db.engine)
except Exception as exc:
app.logger.error("Migration check failed: %s", exc)
raise
Expand Down
Loading