Skip to content

Commit

Permalink
Try to split requests by archive
Browse files Browse the repository at this point in the history
Closes #8069

Co-authored-by: BijinDev <bir@tutao.de>
  • Loading branch information
paw-hub and BijinDev committed Dec 17, 2024
1 parent 41933f8 commit 6b16bf4
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 31 deletions.
68 changes: 49 additions & 19 deletions src/common/api/worker/facades/BlobAccessTokenFacade.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { BlobServerAccessInfo, createBlobAccessTokenPostIn, createBlobReadData,
import { DateProvider } from "../../common/DateProvider.js"
import { resolveTypeReference } from "../../common/EntityFunctions.js"
import { AuthDataProvider } from "./UserFacade.js"
import { isEmpty, TypeRef } from "@tutao/tutanota-utils"
import { deduplicate, getFirstOrThrow, isEmpty, lazyMemoized, TypeRef } from "@tutao/tutanota-utils"
import { ProgrammingError } from "../../common/error/ProgrammingError.js"
import { BlobLoadOptions } from "./lazy/BlobFacade.js"
import { BlobReferencingInstance } from "../../common/utils/BlobUtils.js"
Expand Down Expand Up @@ -70,14 +70,17 @@ export class BlobAccessTokenFacade {

/**
* Requests a token that grants read access to all blobs that are referenced by the given instances.
* A user must be owner of the instance but must not be owner of the archive were the blobs are stored in.
* A user must be owner of the instance but must not be owner of the archive where the blobs are stored in.
*
* @param archiveDataType specify the data type
* @param referencingInstances the instances that references the blobs
* @param blobLoadOptions load options when loading blobs
* @throws ProgrammingError if instances are not part of the same list or blobs are not part of the same archive.
*/
async requestReadTokenMultipleBlobs(
async requestReadTokenMultipleInstances(
archiveDataType: ArchiveDataType,
referencingInstances: readonly BlobReferencingInstance[],
blobLoadOptions: BlobLoadOptions,
): Promise<BlobServerAccessInfo> {
if (isEmpty(referencingInstances)) {
throw new ProgrammingError("Must pass at least one referencing instance")
Expand All @@ -86,7 +89,8 @@ export class BlobAccessTokenFacade {
if (!referencingInstances.every((instance) => instance.listId === instanceListId)) {
throw new ProgrammingError("All referencing instances must be part of the same list")
}
const requestNewToken = async () => {

const requestNewToken = lazyMemoized(async () => {
const archiveId = this.getArchiveId(referencingInstances)
const instanceIds = referencingInstances.map(({ elementId }) => createInstanceId({ instanceId: elementId }))
const tokenRequest = createBlobAccessTokenPostIn({
Expand All @@ -98,18 +102,22 @@ export class BlobAccessTokenFacade {
}),
write: null,
})
const { blobAccessInfo } = await this.serviceExecutor.post(BlobAccessTokenService, tokenRequest)
const { blobAccessInfo } = await this.serviceExecutor.post(BlobAccessTokenService, tokenRequest, blobLoadOptions)
return blobAccessInfo
}
// Does not cache them. We could put them in the cache for each of the referencing instances, but we need to change how the cache works to do that.
return requestNewToken()
})

return this.readBlobCache.getTokenMultiple(
referencingInstances.map((instance) => instance.elementId),
requestNewToken,
)
}

/**
* Requests a token that grants read access to all blobs that are referenced by the given instance.
* A user must be owner of the instance but must not be owner of the archive were the blobs are stored in.
* @param archiveDataType specify the data type
* @param referencingInstance the instance that references the blobs
* @param blobLoadOptions load options when loading blobs
*/
async requestReadTokenBlobs(
archiveDataType: ArchiveDataType,
Expand Down Expand Up @@ -144,6 +152,14 @@ export class BlobAccessTokenFacade {
this.readBlobCache.evict(referencingInstance.elementId)
}

/**
* Remove a given read blobs token from the cache.
* @param referencingInstances
*/
evictReadBlobsTokenMultipleBlobs(referencingInstances: BlobReferencingInstance[]): void {
this.readBlobCache.evictAll(referencingInstances.map((instance) => instance.elementId))
}

/**
* Requests a token that grants access to all blobs stored in the given archive. The user must own the archive (member of group)
* @param archiveId ID for the archive to read blobs from
Expand Down Expand Up @@ -221,27 +237,41 @@ function canBeUsedForAnotherRequest(blobServerAccessInfo: BlobServerAccessInfo,
return blobServerAccessInfo.expires.getTime() > dateProvider.now()
}

class BlobAccessTokenCache<K> {
private cache: Map<K, BlobServerAccessInfo>
class BlobAccessTokenCache<IdType> {
private cache: Map<IdType, BlobServerAccessInfo>
private dateProvider: DateProvider

constructor(dateProvider: DateProvider) {
this.cache = new Map<K, BlobServerAccessInfo>()
this.cache = new Map<IdType, BlobServerAccessInfo>()
this.dateProvider = dateProvider
}

public async getToken(key: K, loader: () => Promise<BlobServerAccessInfo>): Promise<BlobServerAccessInfo> {
const cached = this.cache.get(key)
if (cached && canBeUsedForAnotherRequest(cached, this.dateProvider)) {
return cached
} else {
public async getToken(id: IdType, loader: () => Promise<BlobServerAccessInfo>): Promise<BlobServerAccessInfo> {
return this.getTokenMultiple([id], loader)
}

public async getTokenMultiple(ids: IdType[], loader: () => Promise<BlobServerAccessInfo>): Promise<BlobServerAccessInfo> {
const tokens = deduplicate(ids.map((id) => this.cache.get(id) ?? null))
const firstTokenFound = getFirstOrThrow(tokens)

if (tokens.length != 1 || firstTokenFound == null || !canBeUsedForAnotherRequest(firstTokenFound, this.dateProvider)) {
const newToken = await loader()
this.cache.set(key, newToken)
for (const id of ids) {
this.cache.set(id, newToken)
}
return newToken
} else {
return firstTokenFound
}
}

public evict(key: K): void {
this.cache.delete(key)
public evict(id: IdType): void {
this.evictAll([id])
}

public evictAll(ids: IdType[]): void {
for (const id of ids) {
this.cache.delete(id)
}
}
}
112 changes: 109 additions & 3 deletions src/common/api/worker/facades/lazy/BlobFacade.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
import { addParamsToUrl, isSuspensionResponse, RestClient, SuspensionBehavior } from "../../rest/RestClient.js"
import { CryptoFacade } from "../../crypto/CryptoFacade.js"
import { clear, concat, neverNull, promiseMap, splitUint8ArrayInChunks, uint8ArrayToBase64, uint8ArrayToString } from "@tutao/tutanota-utils"
import { ArchiveDataType, MAX_BLOB_SIZE_BYTES } from "../../../common/TutanotaConstants.js"
import {
assertNotNull,
clear,
concat,
isEmpty,
neverNull,
promiseMap,
splitUint8ArrayInChunks,
uint8ArrayToBase64,
uint8ArrayToString,
} from "@tutao/tutanota-utils"
import { ArchiveDataType, GroupType, MAX_BLOB_SIZE_BYTES } from "../../../common/TutanotaConstants.js"

import { HttpMethod, MediaType, resolveTypeReference } from "../../../common/EntityFunctions.js"
import { assertWorkerOrNode, isApp, isDesktop } from "../../../common/Env.js"
Expand All @@ -11,7 +21,7 @@ import { aesDecrypt, AesKey, sha256Hash } from "@tutao/tutanota-crypto"
import type { FileUri, NativeFileApp } from "../../../../native/common/FileApp.js"
import type { AesApp } from "../../../../native/worker/AesApp.js"
import { InstanceMapper } from "../../crypto/InstanceMapper.js"
import { Blob, BlobReferenceTokenWrapper, createBlobReferenceTokenWrapper } from "../../../entities/sys/TypeRefs.js"
import { Blob, BlobReferenceTokenWrapper, createBlobReferenceTokenWrapper, GroupTypeRef } from "../../../entities/sys/TypeRefs.js"
import { FileReference } from "../../../common/utils/FileUtils.js"
import { handleRestError } from "../../../common/error/RestError.js"
import { ProgrammingError } from "../../../common/error/ProgrammingError.js"
Expand All @@ -24,6 +34,7 @@ import { DefaultEntityRestCache } from "../../rest/DefaultEntityRestCache.js"
import { SomeEntity } from "../../../common/EntityTypes.js"
import { encryptBytes } from "../../crypto/CryptoWrapper.js"
import { BlobReferencingInstance } from "../../../common/utils/BlobUtils.js"
import { locator } from "../../../../../mail-app/workerUtils/worker/WorkerLocator"

assertWorkerOrNode()
export const BLOB_SERVICE_REST_PATH = `/rest/${BlobService.app}/${BlobService.name.toLowerCase()}`
Expand Down Expand Up @@ -124,6 +135,101 @@ export class BlobFacade {
return concat(...blobData)
}

/**
* Downloads multiple blobs, decrypts and joins them to unencrypted binary data.
*
* @param archiveDataType
* @param referencingInstances that directly references the blobs
* @returns Uint8Array unencrypted binary data
*/
async downloadAndDecryptMultipleInstances(
archiveDataType: ArchiveDataType,
referencingInstances: BlobReferencingInstance[],
blobLoadOptions: BlobLoadOptions = {},
): Promise<Map<Id, Promise<Uint8Array>>> {
if (isEmpty(referencingInstances)) {
return new Map()
}

if (referencingInstances.length === 1) {
const downloaded = this.downloadAndDecrypt(archiveDataType, referencingInstances[0], blobLoadOptions)
return new Map([[referencingInstances[0].elementId, downloaded]])
}

// If a mail has multiple attachments, we cannot assume they are all on the same archive.
const allArchives: Map<Id, BlobReferencingInstance[]> = new Map()
for (const instance of referencingInstances) {
const archiveId = instance.blobs[0].archiveId
const archive = allArchives.get(archiveId) ?? []
archive.push(instance)
allArchives.set(archiveId, archive)
}

// We can assume that they were all uploaded by the same group though
const firstArchive: Id = assertNotNull(allArchives.keys().next().value)

// FIXME: find a better way to figure out if we own the archive (also, don't use locator!)
const entityRestCache = assertNotNull(this.entityRestCache)
const allMailGroupIds = locator.user
.getLoggedInUser()
.memberships.filter((membership) => membership.groupType === GroupType.Mail)
.map((membership) => membership.group)
const allMailGroups = await entityRestCache.loadMultiple(GroupTypeRef, null, allMailGroupIds)
const allArchivesWeHaveAccessTo = allMailGroups.flatMap((group) => group.archives).flatMap((archive) => [archive.active, ...archive.inactive])
const ownArchives = allArchivesWeHaveAccessTo.some((archive) => archive.archiveId === firstArchive)

// file to data
const result: Map<Id, Promise<Uint8Array>> = new Map()

for (const [archive, instances] of allArchives.entries()) {
let latestAccessInfo: Promise<BlobServerAccessInfo> | null = null

for (const instance of instances) {
const sessionKey = await this.resolveSessionKey(instance.entity)
let doBlobRequest
let doEvictToken

if (ownArchives) {
doBlobRequest = async () => {
if (latestAccessInfo == null) {
latestAccessInfo = this.blobAccessTokenFacade.requestReadTokenArchive(archive)
}
const accessInfoToUse = await latestAccessInfo
return promiseMap(instance.blobs, (blob) => this.downloadAndDecryptChunk(blob, accessInfoToUse, sessionKey, blobLoadOptions))
}
doEvictToken = () => {
this.blobAccessTokenFacade.evictArchiveToken(archive)
latestAccessInfo = null
}
} else {
doBlobRequest = async () => {
if (latestAccessInfo == null) {
latestAccessInfo = this.blobAccessTokenFacade.requestReadTokenMultipleInstances(
ArchiveDataType.Attachments,
instances,
blobLoadOptions,
)
}
const accessInfoToUse = await latestAccessInfo
return promiseMap(instance.blobs, (blob) => this.downloadAndDecryptChunk(blob, accessInfoToUse, sessionKey, blobLoadOptions))
}
doEvictToken = () => {
this.blobAccessTokenFacade.evictReadBlobsToken(instance)
latestAccessInfo = null
}
}

const request = doBlobRequestWithRetry(doBlobRequest, doEvictToken)
result.set(
instance.elementId,
request.then((data) => concat(...data)),
)
}
}

return result
}

/**
* Downloads multiple blobs, decrypts and joins them to unencrypted binary data which will be stored as a file on the
* device.
Expand Down
22 changes: 13 additions & 9 deletions src/common/api/worker/facades/lazy/MailExportFacade.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { File as TutanotaFile, Mail } from "../../../entities/tutanota/TypeRefs"
import { assertWorkerOrNode } from "../../../common/Env"
import { isNotNull, promiseMap } from "@tutao/tutanota-utils"
import { NotFoundError } from "../../../common/error/RestError"
import { BulkMailLoader, MailWithMailDetails } from "../../../../../mail-app/workerUtils/index/BulkMailLoader.js"
import { convertToDataFile, DataFile } from "../../../common/DataFile.js"
import { ArchiveDataType } from "../../../common/TutanotaConstants.js"
import { BlobFacade } from "./BlobFacade.js"
import { CryptoFacade } from "../../crypto/CryptoFacade.js"
import { createReferencingInstance } from "../../../common/utils/BlobUtils.js"
import { MailExportTokenFacade } from "./MailExportTokenFacade.js"
import { assertNotNull, isNotNull, promiseMap } from "@tutao/tutanota-utils"
import { NotFoundError } from "../../../common/error/RestError"
import { elementIdPart } from "../../../common/utils/EntityUtils"

assertWorkerOrNode()

Expand Down Expand Up @@ -46,14 +47,16 @@ export class MailExportFacade {

async loadAttachmentData(mail: Mail, attachments: readonly TutanotaFile[]): Promise<DataFile[]> {
const attachmentsWithKeys = await this.cryptoFacade.enforceSessionKeyUpdateIfNeeded(mail, attachments)
// TODO: download attachments efficiently.
// - download multiple blobs at once if possible
// - use file references instead of data files (introduce a similar type to MailBundle or change MailBundle)
const attachmentData = await promiseMap(attachmentsWithKeys, async (attachment) => {

const downloads = await this.mailExportTokenFacade.loadWithToken((token) => {
const referencingInstances = attachmentsWithKeys.map(createReferencingInstance)
return this.blobFacade.downloadAndDecryptMultipleInstances(ArchiveDataType.Attachments, referencingInstances, this.options(token))
})

const attachmentData = await promiseMap(downloads.entries(), async ([fileId, download]) => {
try {
const bytes = await this.mailExportTokenFacade.loadWithToken((token) =>
this.blobFacade.downloadAndDecrypt(ArchiveDataType.Attachments, createReferencingInstance(attachment), this.options(token)),
)
const bytes = await download
const attachment = assertNotNull(attachmentsWithKeys.find((attachment) => elementIdPart(attachment._id) === fileId))
return convertToDataFile(attachment, bytes)
} catch (e) {
if (e instanceof NotFoundError) {
Expand All @@ -63,6 +66,7 @@ export class MailExportFacade {
}
}
})

return attachmentData.filter(isNotNull)
}

Expand Down
4 changes: 4 additions & 0 deletions src/mail-app/native/main/MailExportController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type { TranslationText } from "../../../common/misc/LanguageViewModel"
import { SuspensionError } from "../../../common/api/common/error/SuspensionError"
import { Scheduler } from "../../../common/api/common/utils/Scheduler"
import { ExportError, ExportErrorReason } from "../../../common/api/common/error/ExportError"
import { assertWorkerOrNode } from "../../../common/api/common/Env"

export type MailExportState =
| { type: "idle" }
Expand All @@ -28,6 +29,8 @@ export type MailExportState =

const TAG = "MailboxExport"

assertWorkerOrNode()

/**
* Controller to keep the state of mail exporting with the details.
*/
Expand Down Expand Up @@ -168,6 +171,7 @@ export class MailExportController {

const downloadedMailDetails = await this.mailExportFacade.loadMailDetails(downloadedMails)
const attachmentInfo = await this.mailExportFacade.loadAttachments(downloadedMails)

for (const { mail, mailDetails } of downloadedMailDetails) {
if (this._state().type !== "exporting") {
return
Expand Down

0 comments on commit 6b16bf4

Please sign in to comment.