Skip to content

Commit

Permalink
removed busy for now
Browse files Browse the repository at this point in the history
  • Loading branch information
arietrouw committed Jul 13, 2023
1 parent d2d8746 commit 2dbe40d
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,13 @@ export abstract class AbstractIndirectArchivist<
}

protected async getHandler(hashes: string[]): Promise<Payload[]> {
return await this.busy(async () => {
return compact(
await Promise.all(
hashes.map(async (hash) => {
return (await this.getFromParents(hash)) ?? null
}),
),
)
})
return compact(
await Promise.all(
hashes.map(async (hash) => {
return (await this.getFromParents(hash)) ?? null
}),
),
)
}

protected head(): Promisable<Payload | undefined> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,43 +83,33 @@ export class IndexedDbArchivist<
}

protected override async allHandler(): Promise<Payload[]> {
return await this.busy(async () => {
const result = await entries<string, Payload>(this.db)
return result.map<Payload>(([_hash, payload]) => payload)
})
const result = await entries<string, Payload>(this.db)
return result.map<Payload>(([_hash, payload]) => payload)
}

protected override async clearHandler(): Promise<void> {
return await this.busy(async () => {
await clear(this.db)
})
await clear(this.db)
}

protected override async deleteHandler(hashes: string[]): Promise<boolean[]> {
return await this.busy(async () => {
await delMany(hashes, this.db)
return hashes.map((_) => true)
})
await delMany(hashes, this.db)
return hashes.map((_) => true)
}

protected override async getHandler(hashes: string[]): Promise<Payload[]> {
return await this.busy(async () => {
const result = await getMany<Payload>(hashes, this.db)
return result
})
const result = await getMany<Payload>(hashes, this.db)
return result
}

protected async insertHandler(payloads: Payload[]): Promise<BoundWitness[]> {
return await this.busy(async () => {
const entries = await Promise.all(
payloads.map<Promise<[string, Payload]>>(async (payload) => {
const hash = await PayloadHasher.hashAsync(payload)
return [hash, payload]
}),
)
await setMany(entries, this.db)
const [result] = await this.bindQueryResult({ payloads, schema: ArchivistInsertQuerySchema }, payloads)
return [result[0]]
})
const entries = await Promise.all(
payloads.map<Promise<[string, Payload]>>(async (payload) => {
const hash = await PayloadHasher.hashAsync(payload)
return [hash, payload]
}),
)
await setMany(entries, this.db)
const [result] = await this.bindQueryResult({ payloads, schema: ArchivistInsertQuerySchema }, payloads)
return [result[0]]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,21 @@ export class MemoryArchivist<
}

protected override async commitHandler(): Promise<BoundWitness[]> {
return await this.busy(async () => {
const payloads = assertEx(await this.allHandler(), 'Nothing to commit')
const settled = await Promise.allSettled(
compact(
Object.values((await this.parents()).commit ?? [])?.map(async (parent) => {
const queryPayload = PayloadWrapper.wrap<ArchivistInsertQuery>({
payloads: await Promise.all(payloads.map((payload) => PayloadWrapper.hashAsync(payload))),
schema: ArchivistInsertQuerySchema,
})
const query = await this.bindQuery(queryPayload, payloads)
return (await parent?.query(query[0], query[1]))?.[0]
}),
),
)
await this.clearHandler()
return compact(settled.filter(fulfilled).map((result) => result.value))
})
const payloads = assertEx(await this.allHandler(), 'Nothing to commit')
const settled = await Promise.allSettled(
compact(
Object.values((await this.parents()).commit ?? [])?.map(async (parent) => {
const queryPayload = PayloadWrapper.wrap<ArchivistInsertQuery>({
payloads: await Promise.all(payloads.map((payload) => PayloadWrapper.hashAsync(payload))),
schema: ArchivistInsertQuerySchema,
})
const query = await this.bindQuery(queryPayload, payloads)
return (await parent?.query(query[0], query[1]))?.[0]
}),
),
)
await this.clearHandler()
return compact(settled.filter(fulfilled).map((result) => result.value))
}

protected override async deleteHandler(hashes: string[]): Promise<boolean[]> {
Expand All @@ -97,43 +95,39 @@ export class MemoryArchivist<
}

protected override async getHandler(hashes: string[]): Promise<Payload[]> {
return await this.busy(async () => {
return compact(
await Promise.all(
hashes.map(async (hash) => {
const payload = this.cache.get(hash) ?? (await super.get([hash]))[0] ?? null
if (this.storeParentReads) {
// NOTE: `payload` can actually be `null` here but TS doesn't seem
// to recognize it. LRUCache claims not to support `null`s via their
// types but seems to under the hood just fine.
this.cache.set(hash, payload)
}
return payload
}),
),
)
})
return compact(
await Promise.all(
hashes.map(async (hash) => {
const payload = this.cache.get(hash) ?? (await super.get([hash]))[0] ?? null
if (this.storeParentReads) {
// NOTE: `payload` can actually be `null` here but TS doesn't seem
// to recognize it. LRUCache claims not to support `null`s via their
// types but seems to under the hood just fine.
this.cache.set(hash, payload)
}
return payload
}),
),
)
}

protected async insertHandler(payloads: Payload[]): Promise<BoundWitness[]> {
return await this.busy(async () => {
await Promise.all(
payloads.map((payload) => {
return this.insertPayloadIntoCache(payload)
}),
)
await Promise.all(
payloads.map((payload) => {
return this.insertPayloadIntoCache(payload)
}),
)

const [result] = await this.bindQueryResult({ payloads, schema: ArchivistInsertQuerySchema }, payloads)
const parentBoundWitnesses: BoundWitness[] = []
const parents = await this.parents()
if (Object.entries(parents.write ?? {}).length) {
// We store the child bw also
parentBoundWitnesses.push(...(await this.writeToParents([result[0], ...payloads])))
}
const boundWitnesses = [result[0], ...parentBoundWitnesses]
await this.emit('inserted', { boundWitnesses, module: this })
return boundWitnesses
})
const [result] = await this.bindQueryResult({ payloads, schema: ArchivistInsertQuerySchema }, payloads)
const parentBoundWitnesses: BoundWitness[] = []
const parents = await this.parents()
if (Object.entries(parents.write ?? {}).length) {
// We store the child bw also
parentBoundWitnesses.push(...(await this.writeToParents([result[0], ...payloads])))
}
const boundWitnesses = [result[0], ...parentBoundWitnesses]
await this.emit('inserted', { boundWitnesses, module: this })
return boundWitnesses
}

private async insertPayloadIntoCache(payload: Payload): Promise<Payload> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,37 +51,31 @@ export abstract class AbstractBridge<
override async resolve<TModule extends Module = Module>(filter?: ModuleFilter): Promise<TModule[]>
override async resolve<TModule extends Module = Module>(nameOrAddress: string): Promise<TModule | undefined>
override async resolve<TModule extends Module = Module>(nameOrAddressOrFilter?: ModuleFilter | string): Promise<TModule | TModule[] | undefined> {
return await this.busy(async () => {
switch (typeof nameOrAddressOrFilter) {
case 'string': {
const byAddress = Account.isAddress(nameOrAddressOrFilter)
? (await super.resolve<TModule>({ address: [nameOrAddressOrFilter] })).pop() ??
(await this.targetDownResolver().resolve<TModule>({ address: [nameOrAddressOrFilter] })).pop()
: undefined
return (
byAddress ??
(await super.resolve<TModule>({ name: [nameOrAddressOrFilter] })).pop() ??
(await this.targetDownResolver().resolve<TModule>({ name: [nameOrAddressOrFilter] })).pop()
)
}
default: {
const filter: ModuleFilter | undefined = nameOrAddressOrFilter
return [...(await this.targetDownResolver().resolve<TModule>(filter)), ...(await super.resolve<TModule>(filter))].filter(duplicateModules)
}
switch (typeof nameOrAddressOrFilter) {
case 'string': {
const byAddress = Account.isAddress(nameOrAddressOrFilter)
? (await super.resolve<TModule>({ address: [nameOrAddressOrFilter] })).pop() ??
(await this.targetDownResolver().resolve<TModule>({ address: [nameOrAddressOrFilter] })).pop()
: undefined
return (
byAddress ??
(await super.resolve<TModule>({ name: [nameOrAddressOrFilter] })).pop() ??
(await this.targetDownResolver().resolve<TModule>({ name: [nameOrAddressOrFilter] })).pop()
)
}
})
default: {
const filter: ModuleFilter | undefined = nameOrAddressOrFilter
return [...(await this.targetDownResolver().resolve<TModule>(filter)), ...(await super.resolve<TModule>(filter))].filter(duplicateModules)
}
}
}

targetDownResolver(address?: string): BridgeModuleResolver {
this._targetDownResolvers[address ?? 'root'] = this._targetDownResolvers[address ?? 'root'] ?? new BridgeModuleResolver(this, this.account)
return this._targetDownResolvers[address ?? 'root'] as BridgeModuleResolver
}

async targetResolve(address: string, filter?: ModuleFilter) {
//TODO: Honor address so that the resolve only is done through that remote module
//right now, we check the entire remote hive
return (await this.targetDownResolver(address).resolve(filter)) as TModule[]
}
async targetResolve(address: string, filter?: ModuleFilter) {}

protected override async queryHandler<T extends QueryBoundWitness = QueryBoundWitness>(query: T, payloads?: Payload[]): Promise<ModuleQueryResult> {
const wrapper = QueryBoundWitnessWrapper.parseQuery<BridgeQuery>(query, payloads)
Expand Down
94 changes: 36 additions & 58 deletions packages/modules/packages/bridge/packages/http/src/HttpBridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,70 +118,48 @@ export class HttpBridge<
}

async targetDiscover(address?: string): Promise<Payload[]> {
return await this.busy(async () => {
//if caching, return cached result if exists
const cachedResult = this.discoverCache?.get(address ?? 'root ')
if (cachedResult) {
return cachedResult
}
const addressToDiscover = address ?? this.rootAddress
const queryPayload = PayloadWrapper.wrap<ModuleDiscoverQuery>({ schema: ModuleDiscoverQuerySchema })
const boundQuery = await this.bindQuery(queryPayload)
const discover = assertEx(await this.targetQuery(addressToDiscover, boundQuery[0], boundQuery[1]), `Unable to resolve [${address}]`)[1]

this._targetQueries[addressToDiscover] = compact(
discover?.map((payload) => {
if (payload.schema === QuerySchema) {
const schemaPayload = payload as QueryPayload
return schemaPayload.query
} else {
return null
}
}) ?? [],
)

const targetConfigSchema = assertEx(
discover.find((payload) => payload.schema === ConfigSchema) as ConfigPayload,
`Discover did not return a [${ConfigSchema}] payload`,
).config

this._targetConfigs[addressToDiscover] = assertEx(
discover?.find((payload) => payload.schema === targetConfigSchema) as ModuleConfig,
`Discover did not return a [${targetConfigSchema}] payload`,
)

//if caching, set entry
this.discoverCache?.set(address ?? 'root', discover)

return discover
})
//if caching, return cached result if exists
const cachedResult = this.discoverCache?.get(address ?? 'root ')
if (cachedResult) {
return cachedResult
}
const addressToDiscover = address ?? this.rootAddress
const queryPayload = PayloadWrapper.wrap<ModuleDiscoverQuery>({ schema: ModuleDiscoverQuerySchema })
const boundQuery = await this.bindQuery(queryPayload)
const discover = assertEx(await this.targetQuery(addressToDiscover, boundQuery[0], boundQuery[1]), `Unable to resolve [${address}]`)[1]

this._targetQueries[addressToDiscover] = compact(
discover?.map((payload) => {
if (payload.schema === QuerySchema) {
const schemaPayload = payload as QueryPayload
return schemaPayload.query
} else {
return null
}
}) ?? [],
)

const targetConfigSchema = assertEx(
discover.find((payload) => payload.schema === ConfigSchema) as ConfigPayload,
`Discover did not return a [${ConfigSchema}] payload`,
).config

this._targetConfigs[addressToDiscover] = assertEx(
discover?.find((payload) => payload.schema === targetConfigSchema) as ModuleConfig,
`Discover did not return a [${targetConfigSchema}] payload`,
)

//if caching, set entry
this.discoverCache?.set(address ?? 'root', discover)

return discover
}

targetQueries(address: string): string[] {
return assertEx(this._targetQueries[address], `targetQueries not set [${address}]`)
}

async targetQuery(address: string, query: QueryBoundWitness, payloads: Payload[] = []): Promise<ModuleQueryResult> {
return await this.busy(async () => {
try {
const moduleUrlString = this.moduleUrl(address).toString()
const result = await this.axios.post<ApiEnvelope<ModuleQueryResult>>(moduleUrlString, [query, payloads])
if (result.status === 404) {
throw `target module not found [${moduleUrlString}] [${result.status}]`
}
if (result.status >= 400) {
this.logger?.error(`targetQuery failed [${moduleUrlString}]`)
throw `targetQuery failed [${moduleUrlString}] [${result.status}]`
}
return result.data.data
} catch (ex) {
const error = ex as AxiosError
this.logger?.error(`Error Status: ${error.status}`)
this.logger?.error(`Error Cause: ${JSON.stringify(error.cause, null, 2)}`)
throw error
}
})
}
async targetQuery(address: string, query: QueryBoundWitness, payloads: Payload[] = []): Promise<ModuleQueryResult> {}

targetQueryable(_address: string, _query: QueryBoundWitness, _payloads?: Payload[], _queryConfig?: ModuleConfig): boolean {
return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,12 @@ export abstract class AbstractIndirectModule<TParams extends ModuleParams = Modu
queryConfig?: TConfig,
): Promise<ModuleQueryResult> {
this.started('throw')
return await this.busy(async () => {
const result = await this.queryHandler(assertEx(QueryBoundWitnessWrapper.unwrap(query)), payloads, queryConfig)
const result = await this.queryHandler(assertEx(QueryBoundWitnessWrapper.unwrap(query)), payloads, queryConfig)

const args: ModuleQueriedEventArgs = { module: this, payloads, query, result }
await this.emit('moduleQueried', args)
const args: ModuleQueriedEventArgs = { module: this, payloads, query, result }
await this.emit('moduleQueried', args)

return result
})
return result
}

queryable<T extends QueryBoundWitness = QueryBoundWitness, TConfig extends ModuleConfig = ModuleConfig>(
Expand Down
Loading

0 comments on commit 2dbe40d

Please sign in to comment.