Skip to content

Commit

Permalink
Ensure Messages Save and Send and Load
Browse files Browse the repository at this point in the history
This commit ensures that we are able to still send messages but this
time to a group ID instead of a single person. This also ensures
that we are able to pull those messages, by the group ID, in the
future and display them correctly
  • Loading branch information
Tim Roberts committed Feb 10, 2024
1 parent ca45fd8 commit 415900a
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 15 deletions.
36 changes: 35 additions & 1 deletion client/src/pages/Messages.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import { useUser } from '../hooks/useuser'
import { WebSocketMessage } from '../hooks/usewebsocket'
import Log from '../log'
import WebSocketContext from '../contexts/websocket'
import { getMessageForGroup } from '../services/messages'
import { useSession } from '@clerk/clerk-react'

const Page = styled.main`
padding: 1rem;
Expand Down Expand Up @@ -277,13 +279,43 @@ const Profile = () => {
const [messages, setMessages] = useState<WebSocketMessage[]>([])
const { sendMessage, lastMessage } = useContext(WebSocketContext)
const { user } = useUser()
const { session } = useSession()
const groupId = '1980b837-6506-4b53-8363-f8d21c43822e'

useEffect(() => {
if (lastMessage) {
setMessages((msgs) => [...msgs, lastMessage])
}
}, [lastMessage])

useEffect(() => {
const getPastMessages = async () => {
const token = await session?.getToken()

if (token) {
const result = await getMessageForGroup(groupId, token)
const mapped: WebSocketMessage[] = result.map((dbMessages) => ({
action: '@@MESSAGE/RECEIVE',
payload: {
message: dbMessages.message,
receiverId: dbMessages.group_id,
},
metadata: {
authorId: dbMessages.author_id,
receivedAt: dbMessages.created_at,
},
id: dbMessages._id,
}))

setMessages((msg) => [...mapped, ...msg])
}
}

if (session && user) {
getPastMessages()
}
}, [user, setMessages, groupId, session])

const handleFormSubmit: React.FormEventHandler<HTMLFormElement> = useCallback(
(e) => {
e.preventDefault()
Expand All @@ -296,7 +328,9 @@ const Profile = () => {
sendMessage({
action: '@@MESSAGES/SAVE',
payload: {
groupId: user.id,
// TODO: get group ID from state
// this is hardcoded
groupId: '1980b837-6506-4b53-8363-f8d21c43822e',
message,
},
metadata: {
Expand Down
8 changes: 8 additions & 0 deletions client/src/services/messages.schema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export interface Message {
_id: string
message: string
sent_at: string
author_id: string
group_id: string
created_at: string
}
17 changes: 17 additions & 0 deletions client/src/services/messages.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { ROOT_URL } from './config'
import { Message } from './messages.schema'

const MESSAGES_BASE_URL = `${ROOT_URL}/messages`

export const getMessageForGroup = async (
groupId: string,
token: string,
): Promise<Message[]> => {
const result = await fetch(`${MESSAGES_BASE_URL}/groups/${groupId}`, {
headers: {
Authorization: `Bearer ${token}`,
},
})

return result.json()
}
1 change: 1 addition & 0 deletions connections/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (cache *CacheConnection) Publish(cmd commands.PublishCmd) error {
body, err := json.Marshal(cmd.Data)

if err != nil {
slog.Warn("There was an error trying to publish the command", slog.Any("error", err))
return err
}

Expand Down
2 changes: 1 addition & 1 deletion connections/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (database *DatabaseConnection) Disconnect(ctx context.Context) {
database.conn.Close()
}

func (database *DatabaseConnection) QueryRow(sql string, target any, params ...any) pgx.Row {
func (database *DatabaseConnection) QueryRow(sql string, params ...any) pgx.Row {
return database.conn.QueryRow(context.TODO(), sql, params...)
}

Expand Down
2 changes: 2 additions & 0 deletions connections/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func (subscriptions *ISubscriptions) Unsubscribe(ctx context.Context, topic stri
func (subscriptions *ISubscriptions) Subscribe(ctx context.Context, topic string) <-chan commands.PubSubCommand {
_, exists := subscriptions.currentSubscriptions[topic]

slog.Debug("We are subscribing to a topic", slog.String("topic", topic))

if exists {
// we already have a listener for this subscription so
// we don't need to do anything
Expand Down
11 changes: 7 additions & 4 deletions data/messages.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package data

import (
"log/slog"
"mck-p/goact/connections"
"time"

Expand Down Expand Up @@ -43,7 +44,7 @@ func (messages *IMessages) SaveMessage(msg NewMessage) (*Message, error) {

row := connections.Database.QueryRow(sql, msg.Message, msg.AuthorId, msg.GroupId)

return &message, row.Scan(&message.Id, &message.AuthorId, &message.GroupId)
return &message, row.Scan(&message.Id, &message.AuthorId, &message.GroupId, &message.CreatedAt)
}

type MessageGroupQuery struct {
Expand All @@ -65,9 +66,9 @@ func (messages *IMessages) GetMessagesForGroup(query MessageGroupQuery) ([]*Mess
messages
WHERE
group_id = $1
ORDER BY created_at DESC
LIMIT $2
OFFSET $3
ORDER BY created_at $4;
OFFSET $3;
`

if query.Limit == 0 {
Expand All @@ -78,7 +79,9 @@ func (messages *IMessages) GetMessagesForGroup(query MessageGroupQuery) ([]*Mess
query.OrderBy = "DESC"
}

rows, err := connections.Database.Query(sql, query.GroupId, query.Limit, query.Offset, query.OrderBy)
slog.Debug("Selecting messages", slog.Any("query", query))

rows, err := connections.Database.Query(sql, query.GroupId, query.Limit, query.Offset)

if err != nil {
return []*Message{}, err
Expand Down
4 changes: 2 additions & 2 deletions data/users.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (users *UserData) GetUserByExternalId(id string) (*User, error) {
SELECT _id as id, externalid FROM users WHERE externalId = $1;
`

row := connections.Database.QueryRow(sql, &user, id)
row := connections.Database.QueryRow(sql, id)

return &user, row.Scan(&user.Id, &user.ExternalId)

Expand All @@ -41,7 +41,7 @@ func (users *UserData) CreateUser(cmd CreateUserCmd) (*User, error) {
RETURNING _id as id, externalId
`

row := cmd.Connection.QueryRow(sql, &user, cmd.ExternalId)
row := cmd.Connection.QueryRow(sql, cmd.ExternalId)

return &user, row.Scan(&user.Id, &user.ExternalId)
}
2 changes: 1 addition & 1 deletion domains/domains.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (domains *IDomains) Process(cmd Command) {
_, span := tracer.Tracer.Start(cmd.CTX, "Domains::Process")
defer span.End()

slog.Info("Processing command", slog.String("id", cmd.Id))
slog.Info("Processing command", slog.String("id", cmd.Id), slog.String("action", cmd.Action))

if Messages.ShouldHandle(cmd.Action) {
wg.Add(1)
Expand Down
22 changes: 16 additions & 6 deletions domains/messages.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package domains

import (
"context"
"fmt"
"log/slog"
"mck-p/goact/commands"
Expand All @@ -21,7 +22,7 @@ type MessageDomain struct{}
var Messages = &MessageDomain{}

func (messages *MessageDomain) ShouldHandle(action string) bool {
return action == SendMessage || action == EchoMessage
return action == SendMessage || action == EchoMessage || action == SaveMessage
}

func MessageToTopic(actorId string) string {
Expand All @@ -33,7 +34,7 @@ func (messages *MessageDomain) Process(cmd Command, wg *sync.WaitGroup) error {
defer span.End()
defer wg.Done()

slog.Info("Handling Message command", slog.Any("command", cmd.Id))
slog.Info("Handling Message command", slog.Any("command", cmd.Id), slog.String("action", cmd.Action))

switch cmd.Action {
case EchoMessage:
Expand All @@ -51,7 +52,7 @@ func (messages *MessageDomain) Process(cmd Command, wg *sync.WaitGroup) error {
},
})
case SendMessage:
connections.Cache.Publish(commands.PublishCmd{
err := connections.Cache.Publish(commands.PublishCmd{
CTX: cmd.CTX,
Topic: MessageToTopic(cmd.Payload["receiverId"].(string)),
Data: commands.PubSubCommand{
Expand All @@ -62,14 +63,21 @@ func (messages *MessageDomain) Process(cmd Command, wg *sync.WaitGroup) error {
Payload: cmd.Payload,
},
})

if err != nil {
slog.Warn("Error publishg", slog.Any("error", err))
return err
}
case SaveMessage:
slog.Debug("Trying to handle Save MEssage", slog.Any("payload", cmd.Payload), slog.Any("actor", cmd.ActorId))
msg, err := data.Messages.SaveMessage(data.NewMessage{
AuthorId: cmd.ActorId,
Message: cmd.Payload["message"].(string),
GroupId: cmd.Payload["groupId"].(string),
})

if err != nil {
slog.Warn("error when trying to save message", slog.Any("error", err))
return err
}

Expand All @@ -91,12 +99,14 @@ func (messages *MessageDomain) Process(cmd Command, wg *sync.WaitGroup) error {
// message
cmd.Dispatch(Command{
Id: fmt.Sprintf("sub::%s", cmd.Id),
ActorId: SendMessage,
Action: SendMessage,
ActorId: cmd.ActorId,
Payload: commands.Payload{
"recieverId": userId,
"receiverId": userId,
"message": cmd.Payload["message"].(string),
},
Metadata: cmd.Metadata,
CTX: cmd.CTX,
CTX: context.Background(),
DispatchOutgoing: cmd.DispatchOutgoing,
})
}(user.Id)
Expand Down
1 change: 1 addition & 0 deletions migrations/20240209182921_messages.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ CREATE TABLE messages(
author_id UUID NOT NULL,
group_id UUID NOT NULL,
message TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (_id),
FOREIGN KEY (author_id) REFERENCES users(_id),
FOREIGN KEY (group_id) REFERENCES message_groups(_id)
Expand Down
1 change: 1 addition & 0 deletions server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ func (handlers *Handler) WebsocketHandler(c *websocket.Conn) {

go func(conn *websocket.Conn) {
for msg := range messagesForUserChannel {
slog.Debug("We got a message for the user on the websocket", slog.String("id", msg.Id))
select {
case <-quit:
slog.Debug("We have been asked to quit the messaging for user channel")
Expand Down

0 comments on commit 415900a

Please sign in to comment.