Skip to content

Commit

Permalink
Set thread for incoming replies (#254)
Browse files Browse the repository at this point in the history
* Correctly set thread for incoming replies

* Add more logs when determining thread

* Fix error with sent emails

* Correctly return email sent time for sent emails
  • Loading branch information
harryzcy authored Apr 3, 2023
1 parent 9c8ac0f commit 9a0269b
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 25 deletions.
4 changes: 2 additions & 2 deletions internal/email/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ func parseGetResult(attributeValues map[string]dynamodbTypes.AttributeValue) (*G
} else {
if result.Type == EmailTypeDraft {
result.TimeUpdated = emailTime
} else {
result.DateSent = emailTime
} else if result.Type == EmailTypeSent {
result.TimeSent = emailTime
}
result.Unread = nil
}
Expand Down
85 changes: 62 additions & 23 deletions internal/email/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package email
import (
"context"
"errors"
"fmt"
"log"
"os"
"strings"
Expand Down Expand Up @@ -144,6 +145,7 @@ type DetermineThreadOutput struct {
// If a thread already exists, the ThreadID is returned and Exists is true.
// If a thread does not exist and a new thread should be created, the ThreadID is randomly generated and ShouldCreate is true.
func DetermineThread(ctx context.Context, api QueryAndGetItemAPI, input *DetermineThreadInput) (*DetermineThreadOutput, error) {
fmt.Println("Determining thread...")
originalMessageID := ""
if len(input.InReplyTo) > 0 {
originalMessageID = input.InReplyTo
Expand All @@ -156,57 +158,94 @@ func DetermineThread(ctx context.Context, api QueryAndGetItemAPI, input *Determi
return &DetermineThreadOutput{}, nil
}

resp, err := api.Query(ctx, &dynamodb.QueryInput{
TableName: aws.String(tableName),
IndexName: aws.String(gsiOriginalIndexName),
KeyConditionExpression: aws.String("OriginalMessageID = :originalMessageID"),
ExpressionAttributeValues: map[string]dynamodbTypes.AttributeValue{
":originalMessageID": &dynamodbTypes.AttributeValueMemberS{Value: originalMessageID},
},
})
if err != nil {
if apiErr := new(dynamodbTypes.ProvisionedThroughputExceededException); errors.As(err, &apiErr) {
return nil, ErrTooManyRequests
}
return nil, err
sesDomain := region + ".amazonses.com"
var possibleSentID string
if strings.HasSuffix(originalMessageID, "@"+sesDomain+">") {
fmt.Println("incoming email is replying to a SES email")
// If the messageID ends with @<SES domain>, it maybe a messageID of a sent email.
// In this case, we need check if there's a corresponding sent email.
possibleSentID = strings.TrimSuffix(originalMessageID, "@"+sesDomain+">")
possibleSentID = strings.TrimPrefix(possibleSentID, "<")
}
// TODO: handle the case where len(resp.Items) > 1
if len(resp.Items) != 1 {
return &DetermineThreadOutput{}, nil

var previousEmail *GetResult
var err error
isSentEmail := false
if possibleSentID != "" {
// Check if the messageID is a sent email first
fmt.Println("checking possible sent email")
previousEmail, err = Get(ctx, api, possibleSentID)
if err != nil && !errors.Is(err, ErrNotFound) {
return nil, err
}
isSentEmail = true
}

searchMessageID := resp.Items[0]["MessageID"].(*dynamodbTypes.AttributeValueMemberS).Value
previousEmail, err := Get(ctx, api, searchMessageID)
if err != nil {
if errors.Is(err, ErrNotFound) {
if previousEmail == nil {
// If the messageID does not corresponded to a sent email, check if it's a received email
fmt.Println("checking original messageID")
var resp *dynamodb.QueryOutput
resp, err = api.Query(ctx, &dynamodb.QueryInput{
TableName: aws.String(tableName),
IndexName: aws.String(gsiOriginalIndexName),
KeyConditionExpression: aws.String("OriginalMessageID = :originalMessageID"),
ExpressionAttributeValues: map[string]dynamodbTypes.AttributeValue{
":originalMessageID": &dynamodbTypes.AttributeValueMemberS{Value: originalMessageID},
},
})
if err != nil {
if apiErr := new(dynamodbTypes.ProvisionedThroughputExceededException); errors.As(err, &apiErr) {
return nil, ErrTooManyRequests
}
return nil, err
}
// TODO: handle the case where len(resp.Items) > 1
if len(resp.Items) == 0 {
return &DetermineThreadOutput{}, nil
}
return nil, err

searchMessageID := resp.Items[0]["MessageID"].(*dynamodbTypes.AttributeValueMemberS).Value
previousEmail, err = Get(ctx, api, searchMessageID)
if err != nil {
if errors.Is(err, ErrNotFound) {
return &DetermineThreadOutput{}, nil
}
return nil, err
}
}

if previousEmail.ThreadID == "" {
// There's no thread for previousEmail, so we need to create a new thread
fmt.Println("determining thread finished: new thread should be created")
threadID := generateThreadID()
return &DetermineThreadOutput{
output := &DetermineThreadOutput{
ThreadID: threadID,
ShouldCreate: true,
CreatingEmailID: previousEmail.MessageID,
CreatingSubject: previousEmail.Subject,
CreatingTime: previousEmail.TimeReceived,
}, nil
}
if isSentEmail {
output.CreatingTime = previousEmail.TimeSent
}
return output, nil
}

if previousEmail.IsThreadLatest {
fmt.Println("determining thread finished: previous email is the latest email in the thread")
return &DetermineThreadOutput{
ThreadID: previousEmail.ThreadID,
Exists: true,
PreviousMessageID: previousEmail.MessageID,
}, nil
}

fmt.Println("determining thread finished: previous email is not the latest email in the thread")
thread, err := GetThread(ctx, api, previousEmail.ThreadID)
if err != nil {
return nil, err
}
fmt.Println("get existing thread finished")
return &DetermineThreadOutput{
ThreadID: previousEmail.ThreadID,
Exists: true,
Expand Down

0 comments on commit 9a0269b

Please sign in to comment.