Skip to content

Commit

Permalink
Merge pull request #24 from simonsobs/JBorrow/issue12
Browse files Browse the repository at this point in the history
Remove global session object
  • Loading branch information
plaplant authored Jan 23, 2024
2 parents a44278b + 49b9f8a commit 69732c1
Show file tree
Hide file tree
Showing 20 changed files with 502 additions and 443 deletions.
1 change: 1 addition & 0 deletions alembic/versions/71df5b41ae41_initial_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def upgrade():
),
Column("status", Enum(TransferStatus), nullable=False),
Column("uploader", String(256), nullable=False),
Column("upload_name", String(256), nullable=False),
Column("source", String(256), nullable=False),
Column("transfer_size", BigInteger, nullable=False),
Column("transfer_checksum", String(256), nullable=False),
Expand Down
22 changes: 15 additions & 7 deletions librarian_background/check_integrity.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@

from schedule import CancelJob

from librarian_server.database import session, query
from librarian_server.orm import StoreMetadata, Instance
from librarian_server.database import get_session

from sqlalchemy.orm import Session


logger = logging.getLogger("schedule")
Expand All @@ -26,9 +28,9 @@ class CheckIntegrity(Task):
age_in_days: int
"Age in days of the files to check. I.e. only check files younger than this (we assume older files are fine as they've been checked before)"

def get_store(self) -> StoreMetadata:
def get_store(self, session: Session) -> StoreMetadata:
possible_metadata = (
query(StoreMetadata).filter(StoreMetadata.name == self.store_name).first()
session.query(StoreMetadata).filter_by(name=self.store_name).first()
)

if not possible_metadata:
Expand All @@ -37,8 +39,15 @@ def get_store(self) -> StoreMetadata:
return possible_metadata

def on_call(self):
with get_session() as session:
return self.core(session=session)

def core(self, session: Session):
"""
Frame this out with the session so that it is automatically closed.
"""
try:
store = self.get_store()
store = self.get_store(session=session)
except ValueError:
# Store doesn't exist. Cancel this job.
logger.error(
Expand All @@ -51,9 +60,8 @@ def on_call(self):

# Now we can query the database for all files that were uploaded in the past age_in_days days.
files = (
query(Instance)
.filter(Instance.store == store)
.filter(Instance.created_time > start_time)
session.query(Instance)
.filter(Instance.store == store and Instance.created_time > start_time)
.all()
)

Expand Down
34 changes: 20 additions & 14 deletions librarian_background/create_clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
from schedule import CancelJob
from pathlib import Path

from librarian_server.database import session, query
from librarian_server.orm import StoreMetadata, Instance, CloneTransfer, TransferStatus
from librarian_server.database import get_session

from sqlalchemy.orm import Session


logger = logging.getLogger("schedule")
Expand All @@ -33,19 +35,23 @@ class CreateLocalClone(Task):

# TODO: In the future, we could implement a _rolling_ n day clone here, i.e. only keep the last n days of files on the clone_to store.

def get_store(self, name: str) -> StoreMetadata:
def get_store(self, name: str, session: Session) -> StoreMetadata:
possible_metadata = (
query(StoreMetadata).filter(StoreMetadata.name == name).first()
session.query(StoreMetadata).filter_by(name=name).first()
)

if not possible_metadata:
raise ValueError(f"Store {name} does not exist.")

return possible_metadata

def on_call(self):
with get_session() as session:
return self.core(session=session)

def core(self, session: Session):
try:
store_from = self.get_store(self.clone_from)
store_from = self.get_store(self.clone_from, session)
except ValueError:
# Store doesn't exist. Cancel this job.
logger.error(
Expand All @@ -54,7 +60,7 @@ def on_call(self):
return CancelJob

try:
store_to = self.get_store(self.clone_to)
store_to = self.get_store(self.clone_to, session)
except ValueError:
# Store doesn't exist. Cancel this job.
logger.error(
Expand All @@ -67,7 +73,7 @@ def on_call(self):

# Now we can query the database for all files that were uploaded in the past age_in_days days.
instances: list[Instance] = (
query(Instance)
session.query(Instance)
.filter(Instance.store == store_from)
.filter(Instance.created_time > start_time)
.all()
Expand All @@ -81,7 +87,7 @@ def on_call(self):
# Check if there is a matching instance already on our clone_to store.
# If there is, we don't need to clone it.
if (
query(Instance)
session.query(Instance)
.filter(Instance.store == store_to)
.filter(Instance.file == instance.file)
.first()
Expand Down Expand Up @@ -113,7 +119,7 @@ def on_call(self):
f"File {instance.file.name} is too large to fit on store {store_to}. Skipping."
)

transfer.fail_transfer()
transfer.fail_transfer(session=session)

all_transfers_successful = False

Expand All @@ -136,15 +142,15 @@ def on_call(self):
f"Failed to transfer file {instance.path} to store {store_to} using transfer manager {transfer_manager}."
)

transfer.fail_transfer()
transfer.fail_transfer(session=session)

continue
except FileNotFoundError as e:
logger.error(
f"File {instance.path} does not exist on store {store_from}. Skipping."
)

transfer.fail_transfer()
transfer.fail_transfer(session=session)

all_transfers_successful = False

Expand All @@ -155,7 +161,7 @@ def on_call(self):
f"Failed to transfer file {instance.path} to store {store_to}. Skipping."
)

transfer.fail_transfer()
transfer.fail_transfer(session=session)

all_transfers_successful = False

Expand All @@ -176,7 +182,7 @@ def on_call(self):
f"Expected {instance.file.checksum}, got {path_info.md5}."
)

transfer.fail_transfer()
transfer.fail_transfer(session=session)

store_to.store_manager.unstage(staged_path)

Expand All @@ -193,7 +199,7 @@ def on_call(self):
)
store_to.store_manager.unstage(staging_name)

transfer.fail_transfer()
transfer.fail_transfer(session=session)

all_transfers_successful = False

Expand Down
39 changes: 25 additions & 14 deletions librarian_background/recieve_clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from .task import Task

from librarian_server.database import session, query
from librarian_server.database import get_session
from librarian_server.orm import (
File,
Instance,
Expand All @@ -28,6 +28,10 @@
CloneCompleteResponse,
)

from typing import TYPE_CHECKING

from sqlalchemy.orm import Session

logger = logging.getLogger("schedule")


Expand All @@ -39,15 +43,21 @@ class RecieveClone(Task):
deletion_policy: DeletionPolicy = DeletionPolicy.DISALLOWED

def on_call(self):
with get_session() as session:
return self.core(session=session)

def core(self, session: Session):
"""
Checks for incoming transfers and processes them.
"""

# Find incoming transfers that are ONGOING
ongoing_transfers: list[IncomingTransfer] = query(
IncomingTransfer, status=TransferStatus.ONGOING
).all()

ongoing_transfers: list[IncomingTransfer] = (
session.query(IncomingTransfer)
.filter_by(status=TransferStatus.ONGOING)
.all()
)

all_transfers_succeeded = True

if len(ongoing_transfers) == 0:
Expand Down Expand Up @@ -118,7 +128,7 @@ def on_call(self):
path=path_info.path,
file=file,
store=store,
deletion_policy=self.deletion_policy
deletion_policy=self.deletion_policy,
)

session.add(file)
Expand All @@ -132,9 +142,9 @@ def on_call(self):
session.commit()

# Callback to the source librarian.
librarian: Optional[Librarian] = query(
Librarian, name=transfer.source
).first()
librarian: Optional[Librarian] = (
session.query(Librarian).filter_by(name=transfer.source).first()
)

if librarian:
# Need to call back
Expand All @@ -148,10 +158,12 @@ def on_call(self):
)

try:
response: CloneCompleteResponse = librarian.client.do_pydantic_http_post(
endpoint="/api/v2/clone/complete",
request_model=request,
response_model=CloneCompleteResponse,
response: CloneCompleteResponse = (
librarian.client.do_pydantic_http_post(
endpoint="/api/v2/clone/complete",
request_model=request,
response_model=CloneCompleteResponse,
)
)
except Exception as e:
logger.error(
Expand All @@ -168,5 +180,4 @@ def on_call(self):
logger.info(f"Transfer {transfer.id} has not yet completed. Skipping.")
continue


return all_transfers_succeeded
33 changes: 22 additions & 11 deletions librarian_background/send_clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from schedule import CancelJob
from pathlib import Path

from librarian_server.database import session, query
from librarian_server.database import get_session()
from librarian_server.orm import (
StoreMetadata,
Instance,
Expand All @@ -30,6 +30,13 @@
CloneOngoingResponse
)

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from hera_librarian import LibrarianClient

from sqlalchemy.orm import Session

logger = logging.getLogger("schedule")


Expand All @@ -49,13 +56,17 @@ class SendClone(Task):
"Name of the store to prefer when sending files. If None, we will use whatever store is available for sending that file."

def on_call(self):
with get_session() as session:
return self.core(session=session)

def core(self, session: Session):
"""
Creates uploads to the remote librarian as specified.
"""
# Before even attempting to do anything, get the information about the librarian and create
# a client connection to it.
librarian: Optional[Librarian] = query(
Librarian, Librarian.name == self.destination_librarian
librarian: Optional[Librarian] = session.query(
Librarian).filter_by(name=self.destination_librarian
).first()

if librarian is None:
Expand All @@ -79,19 +90,19 @@ def on_call(self):
age_in_days = datetime.timedelta(days=self.age_in_days)
oldest_file_age = current_time - age_in_days

files_without_remote_instances: list[File] = query(
File,
File.create_time > oldest_file_age,
File.remote_instances.any(librarian_name=self.destination_librarian),
files_without_remote_instances: list[File] = session.query(
File).filter(
File.create_time > oldest_file_age and
File.remote_instances.any(librarian_name=self.destination_librarian)
).all()

logger.info(
f"Found {len(files_without_remote_instances)} files without remote instances."
)

if self.store_preference is not None:
use_store: StoreMetadata = query(
StoreMetadata, StoreMetadata.name == self.store_preference
use_store: StoreMetadata = session.query(
StoreMetadata).filter_by(name = self.store_preference
).first()

if use_store is None:
Expand Down Expand Up @@ -159,7 +170,7 @@ def on_call(self):
)

# Mark the transfer as failed.
transfer.fail_transfer()
transfer.fail_transfer(session=session)
continue


Expand Down Expand Up @@ -192,7 +203,7 @@ def on_call(self):
f"Failed to transfer file {instance.path} to remote store. Skipping."
)

transfer.fail_transfer()
transfer.fail_transfer(session=session)
continue

# Great! We can now mark the transfer as ONGOING in the background.
Expand Down
Loading

0 comments on commit 69732c1

Please sign in to comment.