Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove global session object #24

Merged
merged 1 commit into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions alembic/versions/71df5b41ae41_initial_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def upgrade():
),
Column("status", Enum(TransferStatus), nullable=False),
Column("uploader", String(256), nullable=False),
Column("upload_name", String(256), nullable=False),
Column("source", String(256), nullable=False),
Column("transfer_size", BigInteger, nullable=False),
Column("transfer_checksum", String(256), nullable=False),
Expand Down
22 changes: 15 additions & 7 deletions librarian_background/check_integrity.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@

from schedule import CancelJob

from librarian_server.database import session, query
from librarian_server.orm import StoreMetadata, Instance
from librarian_server.database import get_session

from sqlalchemy.orm import Session


logger = logging.getLogger("schedule")
Expand All @@ -26,9 +28,9 @@ class CheckIntegrity(Task):
age_in_days: int
"Age in days of the files to check. I.e. only check files younger than this (we assume older files are fine as they've been checked before)"

def get_store(self) -> StoreMetadata:
def get_store(self, session: Session) -> StoreMetadata:
possible_metadata = (
query(StoreMetadata).filter(StoreMetadata.name == self.store_name).first()
session.query(StoreMetadata).filter_by(name=self.store_name).first()
)

if not possible_metadata:
Expand All @@ -37,8 +39,15 @@ def get_store(self) -> StoreMetadata:
return possible_metadata

def on_call(self):
with get_session() as session:
return self.core(session=session)

def core(self, session: Session):
"""
Frame this out with the session so that it is automatically closed.
"""
try:
store = self.get_store()
store = self.get_store(session=session)
except ValueError:
# Store doesn't exist. Cancel this job.
logger.error(
Expand All @@ -51,9 +60,8 @@ def on_call(self):

# Now we can query the database for all files that were uploaded in the past age_in_days days.
files = (
query(Instance)
.filter(Instance.store == store)
.filter(Instance.created_time > start_time)
session.query(Instance)
.filter(Instance.store == store and Instance.created_time > start_time)
.all()
)

Expand Down
34 changes: 20 additions & 14 deletions librarian_background/create_clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
from schedule import CancelJob
from pathlib import Path

from librarian_server.database import session, query
from librarian_server.orm import StoreMetadata, Instance, CloneTransfer, TransferStatus
from librarian_server.database import get_session

from sqlalchemy.orm import Session


logger = logging.getLogger("schedule")
Expand All @@ -33,19 +35,23 @@ class CreateLocalClone(Task):

# TODO: In the future, we could implement a _rolling_ n day clone here, i.e. only keep the last n days of files on the clone_to store.

def get_store(self, name: str) -> StoreMetadata:
def get_store(self, name: str, session: Session) -> StoreMetadata:
possible_metadata = (
query(StoreMetadata).filter(StoreMetadata.name == name).first()
session.query(StoreMetadata).filter_by(name=name).first()
)

if not possible_metadata:
raise ValueError(f"Store {name} does not exist.")

return possible_metadata

def on_call(self):
with get_session() as session:
return self.core(session=session)

def core(self, session: Session):
try:
store_from = self.get_store(self.clone_from)
store_from = self.get_store(self.clone_from, session)
except ValueError:
# Store doesn't exist. Cancel this job.
logger.error(
Expand All @@ -54,7 +60,7 @@ def on_call(self):
return CancelJob

try:
store_to = self.get_store(self.clone_to)
store_to = self.get_store(self.clone_to, session)
except ValueError:
# Store doesn't exist. Cancel this job.
logger.error(
Expand All @@ -67,7 +73,7 @@ def on_call(self):

# Now we can query the database for all files that were uploaded in the past age_in_days days.
instances: list[Instance] = (
query(Instance)
session.query(Instance)
.filter(Instance.store == store_from)
.filter(Instance.created_time > start_time)
.all()
Expand All @@ -81,7 +87,7 @@ def on_call(self):
# Check if there is a matching instance already on our clone_to store.
# If there is, we don't need to clone it.
if (
query(Instance)
session.query(Instance)
.filter(Instance.store == store_to)
.filter(Instance.file == instance.file)
.first()
Expand Down Expand Up @@ -113,7 +119,7 @@ def on_call(self):
f"File {instance.file.name} is too large to fit on store {store_to}. Skipping."
)

transfer.fail_transfer()
transfer.fail_transfer(session=session)

all_transfers_successful = False

Expand All @@ -136,15 +142,15 @@ def on_call(self):
f"Failed to transfer file {instance.path} to store {store_to} using transfer manager {transfer_manager}."
)

transfer.fail_transfer()
transfer.fail_transfer(session=session)

continue
except FileNotFoundError as e:
logger.error(
f"File {instance.path} does not exist on store {store_from}. Skipping."
)

transfer.fail_transfer()
transfer.fail_transfer(session=session)

all_transfers_successful = False

Expand All @@ -155,7 +161,7 @@ def on_call(self):
f"Failed to transfer file {instance.path} to store {store_to}. Skipping."
)

transfer.fail_transfer()
transfer.fail_transfer(session=session)

all_transfers_successful = False

Expand All @@ -176,7 +182,7 @@ def on_call(self):
f"Expected {instance.file.checksum}, got {path_info.md5}."
)

transfer.fail_transfer()
transfer.fail_transfer(session=session)

store_to.store_manager.unstage(staged_path)

Expand All @@ -193,7 +199,7 @@ def on_call(self):
)
store_to.store_manager.unstage(staging_name)

transfer.fail_transfer()
transfer.fail_transfer(session=session)

all_transfers_successful = False

Expand Down
39 changes: 25 additions & 14 deletions librarian_background/recieve_clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from .task import Task

from librarian_server.database import session, query
from librarian_server.database import get_session
from librarian_server.orm import (
File,
Instance,
Expand All @@ -28,6 +28,10 @@
CloneCompleteResponse,
)

from typing import TYPE_CHECKING

from sqlalchemy.orm import Session

logger = logging.getLogger("schedule")


Expand All @@ -39,15 +43,21 @@ class RecieveClone(Task):
deletion_policy: DeletionPolicy = DeletionPolicy.DISALLOWED

def on_call(self):
with get_session() as session:
return self.core(session=session)

def core(self, session: Session):
"""
Checks for incoming transfers and processes them.
"""

# Find incoming transfers that are ONGOING
ongoing_transfers: list[IncomingTransfer] = query(
IncomingTransfer, status=TransferStatus.ONGOING
).all()

ongoing_transfers: list[IncomingTransfer] = (
session.query(IncomingTransfer)
.filter_by(status=TransferStatus.ONGOING)
.all()
)

all_transfers_succeeded = True

if len(ongoing_transfers) == 0:
Expand Down Expand Up @@ -118,7 +128,7 @@ def on_call(self):
path=path_info.path,
file=file,
store=store,
deletion_policy=self.deletion_policy
deletion_policy=self.deletion_policy,
)

session.add(file)
Expand All @@ -132,9 +142,9 @@ def on_call(self):
session.commit()

# Callback to the source librarian.
librarian: Optional[Librarian] = query(
Librarian, name=transfer.source
).first()
librarian: Optional[Librarian] = (
session.query(Librarian).filter_by(name=transfer.source).first()
)

if librarian:
# Need to call back
Expand All @@ -148,10 +158,12 @@ def on_call(self):
)

try:
response: CloneCompleteResponse = librarian.client.do_pydantic_http_post(
endpoint="/api/v2/clone/complete",
request_model=request,
response_model=CloneCompleteResponse,
response: CloneCompleteResponse = (
librarian.client.do_pydantic_http_post(
endpoint="/api/v2/clone/complete",
request_model=request,
response_model=CloneCompleteResponse,
)
)
except Exception as e:
logger.error(
Expand All @@ -168,5 +180,4 @@ def on_call(self):
logger.info(f"Transfer {transfer.id} has not yet completed. Skipping.")
continue


return all_transfers_succeeded
33 changes: 22 additions & 11 deletions librarian_background/send_clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from schedule import CancelJob
from pathlib import Path

from librarian_server.database import session, query
from librarian_server.database import get_session()
from librarian_server.orm import (
StoreMetadata,
Instance,
Expand All @@ -30,6 +30,13 @@
CloneOngoingResponse
)

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from hera_librarian import LibrarianClient

from sqlalchemy.orm import Session

logger = logging.getLogger("schedule")


Expand All @@ -49,13 +56,17 @@ class SendClone(Task):
"Name of the store to prefer when sending files. If None, we will use whatever store is available for sending that file."

def on_call(self):
with get_session() as session:
return self.core(session=session)

def core(self, session: Session):
"""
Creates uploads to the remote librarian as specified.
"""
# Before even attempting to do anything, get the information about the librarian and create
# a client connection to it.
librarian: Optional[Librarian] = query(
Librarian, Librarian.name == self.destination_librarian
librarian: Optional[Librarian] = session.query(
Librarian).filter_by(name=self.destination_librarian
).first()

if librarian is None:
Expand All @@ -79,19 +90,19 @@ def on_call(self):
age_in_days = datetime.timedelta(days=self.age_in_days)
oldest_file_age = current_time - age_in_days

files_without_remote_instances: list[File] = query(
File,
File.create_time > oldest_file_age,
File.remote_instances.any(librarian_name=self.destination_librarian),
files_without_remote_instances: list[File] = session.query(
File).filter(
File.create_time > oldest_file_age and
File.remote_instances.any(librarian_name=self.destination_librarian)
).all()

logger.info(
f"Found {len(files_without_remote_instances)} files without remote instances."
)

if self.store_preference is not None:
use_store: StoreMetadata = query(
StoreMetadata, StoreMetadata.name == self.store_preference
use_store: StoreMetadata = session.query(
StoreMetadata).filter_by(name = self.store_preference
).first()

if use_store is None:
Expand Down Expand Up @@ -159,7 +170,7 @@ def on_call(self):
)

# Mark the transfer as failed.
transfer.fail_transfer()
transfer.fail_transfer(session=session)
continue


Expand Down Expand Up @@ -192,7 +203,7 @@ def on_call(self):
f"Failed to transfer file {instance.path} to remote store. Skipping."
)

transfer.fail_transfer()
transfer.fail_transfer(session=session)
continue

# Great! We can now mark the transfer as ONGOING in the background.
Expand Down
Loading