Skip to content

Commit

Permalink
We will now check ephemeral bucket for files.
Browse files Browse the repository at this point in the history
  • Loading branch information
KevinJoiner committed Nov 4, 2024
1 parent fa2f32e commit 7b53ec0
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 7 deletions.
43 changes: 43 additions & 0 deletions internal/fetch/fetch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Package fetch provides functions for fetching objects from the index service.
package fetch

import (
"context"
"errors"
"fmt"

"github.com/DIMO-Network/nameindexer/pkg/clickhouse/indexrepo"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
)

// GetObjectsFromIndexs gets objects from the index service by trying to get them from each bucket in the list returning the first successful result.
func GetObjectsFromIndexs(ctx context.Context, idxSvc *indexrepo.Service, indexKeys []string, buckets []string) ([]indexrepo.DataObject, error) {
dataObjects := make([]indexrepo.DataObject, 0, len(indexKeys))
for _, key := range indexKeys {
obj, err := GetObjectFromIndex(ctx, idxSvc, key, buckets)
if err != nil {
return nil, fmt.Errorf("failed to get object: %w", err)
}
dataObjects = append(dataObjects, obj)
}
return dataObjects, nil
}

// GetObjectFromIndex gets an object from the index service by trying to get it from each bucket in the list returning the first successful result.
func GetObjectFromIndex(ctx context.Context, idxSvc *indexrepo.Service, indexKeys string, buckets []string) (indexrepo.DataObject, error) {
var obj indexrepo.DataObject
var err error
// Try to get the object from each bucket in the list
for _, bucket := range buckets {
obj, err = idxSvc.GetObjectFromIndex(ctx, indexKeys, bucket)
if err != nil {
notFoundErr := &types.NoSuchKey{}
if errors.As(err, &notFoundErr) {
continue
}
return indexrepo.DataObject{}, fmt.Errorf("failed to get object: %w", err)
}
return obj, nil
}
return indexrepo.DataObject{}, fmt.Errorf("failed to get object: %w", err)
}
15 changes: 11 additions & 4 deletions internal/fetch/httphandler/httphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/DIMO-Network/fetch-api/internal/fetch"
"github.com/DIMO-Network/model-garage/pkg/cloudevent"
"github.com/DIMO-Network/nameindexer"
"github.com/DIMO-Network/nameindexer/pkg/clickhouse/indexrepo"
Expand Down Expand Up @@ -179,7 +180,11 @@ func (h *Handler) GetObjects(fCtx *fiber.Ctx) error {

opts := params.toSearchOptions(cloudevent.NFTDID{ChainID: h.chainID, ContractAddress: h.vehicleAddr, TokenID: uint32(uTokenID)})

data, err := h.indexService.GetObject(fCtx.Context(), h.cloudEventBucket, params.Limit, opts)
indexKeys, err := h.indexService.GetIndexKeys(fCtx.Context(), params.Limit, opts)
if err != nil {
return handleDBError(err, h.logger)
}
data, err := fetch.GetObjectsFromIndexs(fCtx.Context(), h.indexService, indexKeys, []string{h.cloudEventBucket, h.ephemeralBucket})
if err != nil {
return handleDBError(err, h.logger)
}
Expand Down Expand Up @@ -213,12 +218,14 @@ func (h *Handler) GetLatestObject(fCtx *fiber.Ctx) error {
}

opts := params.toSearchOptions(cloudevent.NFTDID{ChainID: h.chainID, ContractAddress: h.vehicleAddr, TokenID: uint32(uTokenID)})

data, err := h.indexService.GetLatestObject(fCtx.Context(), h.cloudEventBucket, opts)
indexKey, err := h.indexService.GetLatestIndexKey(fCtx.Context(), opts)
if err != nil {
return handleDBError(err, h.logger)
}
data, err := fetch.GetObjectFromIndex(fCtx.Context(), h.indexService, indexKey, []string{h.cloudEventBucket, h.ephemeralBucket})
if err != nil {
return handleDBError(err, h.logger)
}

return fCtx.JSON(data)
}

Expand Down
16 changes: 13 additions & 3 deletions internal/fetch/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/DIMO-Network/fetch-api/internal/fetch"
"github.com/DIMO-Network/fetch-api/pkg/grpc"
"github.com/DIMO-Network/nameindexer/pkg/clickhouse/indexrepo"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand Down Expand Up @@ -52,10 +53,15 @@ func (s *Server) GetIndexKeys(ctx context.Context, req *grpc.GetIndexKeysRequest
// GetObjects translates the gRPC call to the indexrepo type and fetches data for the given options.
func (s *Server) GetObjects(ctx context.Context, req *grpc.GetObjectsRequest) (*grpc.GetObjectsResponse, error) {
options := translateSearchOptions(req.GetOptions())
data, err := s.indexService.GetObject(ctx, s.cloudEventBucket, int(req.GetLimit()), options)
idxKeys, err := s.indexService.GetIndexKeys(ctx, int(req.GetLimit()), options)
if err != nil {
return nil, fmt.Errorf("failed to get objects: %w", err)
}
data, err := fetch.GetObjectsFromIndexs(ctx, s.indexService, idxKeys, []string{s.cloudEventBucket, s.ephemeralBucket})
if err != nil {
return nil, fmt.Errorf("failed to get latest object: %w", err)
}

dataObjects := make([]*grpc.DataObject, len(data))
for i, d := range data {
dataObjects[i] = &grpc.DataObject{
Expand All @@ -69,7 +75,11 @@ func (s *Server) GetObjects(ctx context.Context, req *grpc.GetObjectsRequest) (*
// GetLatestObject translates the gRPC call to the indexrepo type and fetches the latest data for the given options.
func (s *Server) GetLatestObject(ctx context.Context, req *grpc.GetLatestObjectRequest) (*grpc.GetLatestObjectResponse, error) {
options := translateSearchOptions(req.GetOptions())
latestData, err := s.indexService.GetLatestObject(ctx, s.cloudEventBucket, options)
idxKey, err := s.indexService.GetLatestIndexKey(ctx, options)
if err != nil {
return nil, fmt.Errorf("failed to get latest object: %w", err)
}
latestData, err := fetch.GetObjectFromIndex(ctx, s.indexService, idxKey, []string{s.cloudEventBucket, s.ephemeralBucket})
if err != nil {
return nil, fmt.Errorf("failed to get latest object: %w", err)
}
Expand All @@ -81,7 +91,7 @@ func (s *Server) GetLatestObject(ctx context.Context, req *grpc.GetLatestObjectR

// GetObjectsFromIndexKeys translates the gRPC call to the indexrepo type and fetches data for the given options.
func (s *Server) GetObjectsFromIndexKeys(ctx context.Context, req *grpc.GetObjectsFromIndexKeysRequest) (*grpc.GetObjectsFromIndexKeysResponse, error) {
data, err := s.indexService.GetObjectsFromIndexKeys(ctx, req.GetIndexKeys(), s.cloudEventBucket)
data, err := fetch.GetObjectsFromIndexs(ctx, s.indexService, req.GetIndexKeys(), []string{s.cloudEventBucket, s.ephemeralBucket})
if err != nil {
return nil, fmt.Errorf("failed to get objects: %w", err)
}
Expand Down

0 comments on commit 7b53ec0

Please sign in to comment.