Skip to content

Commit

Permalink
feat: added background task middleware and worker
Browse files Browse the repository at this point in the history
  • Loading branch information
Jadapema committed Dec 18, 2024
1 parent 28bf2c4 commit 30957b6
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 108 deletions.
8 changes: 4 additions & 4 deletions src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ export default function App() {
themeStretch: false,
}}
>
<Provider store={store}>
<AuthProvider>
<AuthProvider>
<Provider store={store}>
<ThemeProvider>
<MotionLazy>
<SnackbarProvider>
Expand All @@ -85,8 +85,8 @@ export default function App() {
</SnackbarProvider>
</MotionLazy>
</ThemeProvider>
</AuthProvider>
</Provider>
</Provider>
</AuthProvider>
</SettingsProvider>
</LocalizationProvider>
);
Expand Down
10 changes: 0 additions & 10 deletions src/redux/comments/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ const commentsSlice = createSlice({
state.counterLikes[publicationId] -= 1;
}
},

addPendingComment: (state, action: PayloadAction<{ publicationId: string; comment: PendingComment }>) => {
const { publicationId, comment } = action.payload;
if (!state.pendingComments[publicationId]) {
Expand All @@ -61,19 +60,10 @@ const commentsSlice = createSlice({
state.pendingComments[publicationId] = [comment, ...state.pendingComments[publicationId]];
},
removePendingComment: (state, action: PayloadAction<{ publicationId: string; commentId: string;}>) => {
console.log('publicationID', action.payload.publicationId);
console.log('commentID', action.payload.commentId);

const { publicationId, commentId } = action.payload;
const comment = state.comments[publicationId]?.find(comment => comment.id === commentId);

console.log('comment', comment);

// Delete the comment from the pending list
state.pendingComments[publicationId] = state.pendingComments[publicationId].filter(comment => comment.id !== commentId);

// Search by uuid and delete from state

},
},
});
Expand Down
133 changes: 133 additions & 0 deletions src/redux/middlewares/backgroundTaskMiddleware.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import { Middleware } from '@reduxjs/toolkit';
import { addPendingComment, removePendingComment, refetchCommentsByPublication } from '@redux/comments';
import BackgroundTaskWorker from '@src/workers/backgroundTaskWorker?worker';

type TaskPayload = {
type: string; // Task type identifier, e.g., 'POST_COMMENT', 'UPDATE_METADATA'
data: any; // Task-specific data
};

type Task = {
id: string;
payload: TaskPayload;
};

const backgroundTaskWorker = new BackgroundTaskWorker();
const taskQueue: Task[] = [];
let isProcessing = false;

/**
* Process a single task based on its type.
*/
const processTask = (store: any, task: Task) => {
return new Promise<void>((resolve, reject) => {
const { type, data } = task.payload;

switch (type) {
case 'POST_COMMENT':
processBackgroundComment(store, data)
.then(resolve)
.catch(reject);
break;
default:
console.error(`Unknown task type: ${type}`);
reject(new Error(`Unknown task type: ${type}`));
}
});
};

/**
* Process a background comment task.
*/
const processBackgroundComment = (store: any, data: any) => {
return new Promise<void>((resolve, reject) => {
const { commentOn, uri, pendingComment, owner, generatePayload, sendNotification, createComment, root } = data;

const sessionData = store.getState().auth.session;

// Send the IPFS verification task to the worker
backgroundTaskWorker.postMessage({
type: 'VERIFY_IPFS',
payload: { uri, pendingCommentId: pendingComment.id },
});

backgroundTaskWorker.onmessage = async (e: MessageEvent<{ success: boolean; pendingCommentId: string; error?: string }>) => {
const { success, pendingCommentId, error } = e.data;

if (success) {
try {
// Create the comment after IPFS verification
await createComment({ commentOn, metadata: uri });

// Remove the pending comment from the store
store.dispatch(removePendingComment({ publicationId: commentOn, commentId: pendingCommentId }));

// Generate the notification payload
const notificationPayload = generatePayload('COMMENT', {
id: owner?.id,
displayName: owner?.displayName,
avatar: owner?.avatar,
}, {
root_id: root,
comment_id: commentOn,
rawDescription: `${sessionData?.profile?.metadata?.displayName} left a comment`,
});

// Send the notification if the comment is not from the owner
if (owner?.id !== sessionData?.profile?.id) {
sendNotification(owner.id, sessionData?.profile?.id, notificationPayload);
}

// Refetch the comments
store.dispatch(refetchCommentsByPublication(commentOn));

console.log('Comment created successfully');
resolve();
} catch (createError) {
console.error('Error creating comment:', createError);
reject(createError);
}
} else {
console.error('Error verifying IPFS data:', error);
reject(new Error(error));
}
};
});
};

/**
* Process the task queue sequentially.
*/
const processQueue = (store: any) => {
if (isProcessing || taskQueue.length === 0) return;

isProcessing = true;
const task = taskQueue.shift()!;

processTask(store, task)
.catch((error) => {
console.error('Error processing task:', error);
})
.finally(() => {
isProcessing = false;
processQueue(store); // Process the next task in the queue
});
};

export const backgroundTaskMiddleware: Middleware = (store) => (next) => (action: any) => {
if (action.type === 'ADD_TASK_TO_BACKGROUND') {
// Add the task to the queue
taskQueue.push({ id: action.payload.id, payload: action.payload });

// Immediately add the pending comment to the store if it's a POST_COMMENT task
if (action.payload.type === 'POST_COMMENT') {
const { commentOn, uri, pendingComment } = action.payload.data;
store.dispatch(addPendingComment({ publicationId: commentOn, comment: { ...pendingComment, uri } }));
}

// Start processing the queue if not already processing
processQueue(store);
}

return next(action);
};
3 changes: 2 additions & 1 deletion src/redux/store.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { type Store, type Middleware } from 'redux';
import { configureStore } from '@reduxjs/toolkit';
import rootReducer from '@src/redux/reducer';
import { backgroundTaskMiddleware } from '@redux/middlewares/backgroundTaskMiddleware.ts';

export const createStore = (initialState = {}): Store => {
const middlewares: Middleware[] = [];
const middlewares: Middleware[] = [backgroundTaskMiddleware];

return configureStore({
reducer: rootReducer,
Expand Down
3 changes: 0 additions & 3 deletions src/sections/publication/publication-comments-list.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ export default function PostCommentList({ publicationId: id, showReplies }: Prop
// Join the comments with the pending comments but append the pending comments at the beginning of the list
const commentsWithPending = pendingComments[id] ? [...pendingComments[id], ...(comments ?? [])] : comments;

console.log('comments', comments);
console.log('comments pending', pendingComments[id]);

const commentsFiltered = (commentsWithPending ?? [])
.filter((comment) => !hiddenComments.some((hiddenComment: any) => hiddenComment.id === comment.id))
.filter((comment) => !comment.isHidden)
Expand Down
106 changes: 18 additions & 88 deletions src/sections/publication/publication-details-comment-form.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,9 @@ import { alpha } from '@mui/material/styles';
import Iconify from '@src/components/iconify';
// @ts-ignore
import { ReadResult } from '@lens-protocol/react/dist/declarations/src/helpers/reads';
import { uploadMetadataToIPFS, verifyIpfsData } from '@src/utils/ipfs';
import { uploadMetadataToIPFS } from '@src/utils/ipfs';
import uuidv4 from '@src/utils/uuidv4.ts';
import {useDispatch, useSelector} from 'react-redux';
import {
refetchCommentsByPublication,
addPendingComment,
removePendingComment
} from '@redux/comments';
import {useNotifications} from "@src/hooks/use-notifications.ts";
import { useNotificationPayload } from '@src/hooks/use-notification-payload.ts';
import {AnyPublication} from "@lens-protocol/api-bindings";
Expand Down Expand Up @@ -77,41 +72,13 @@ const MovieCommentForm = ({ commentOn, owner, root }: MovieCommentFormProps) =>
const { sendNotification } = useNotifications();
const { generatePayload } = useNotificationPayload(sessionData);

const executeCreateCommentWithRetry = async (
createComment: any,
params: any,
retries = 4,
delayMs = 3000
): Promise<any> => {
for (let attempt = 1; attempt <= retries; attempt++) {
try {
const result = await createComment(params);
if (!result.isFailure()) {
return result;
} else {
console.warn(`Attempt ${attempt}: Failed to create comment. Error: ${result.error.message}`);
}
} catch (error: any) {
console.warn(`Attempt ${attempt}: Error creating comment. Error: ${error.message}`);
}

if (attempt < retries) {
console.log(`Retrying in ${delayMs}ms... (${attempt}/${retries})`);
await new Promise((resolve) => setTimeout(resolve, delayMs));
}
}

throw new Error(`Could not create the comment after ${retries} attempts.`);
};

/**
* Form submission handler.
*
* @param {any} data - Form data.
*/
const onSubmit = handleSubmit(async (data) => {
try {
// HAbilitar el efecto en el comentario
const uuid = uuidv4();

const metadata = textOnly({
Expand Down Expand Up @@ -148,7 +115,6 @@ const MovieCommentForm = ({ commentOn, owner, root }: MovieCommentFormProps) =>
operations: {
hasUpvoted: false
},

by: sessionData?.profile,
createdAt: new Date().toISOString(),
};
Expand All @@ -163,63 +129,27 @@ const MovieCommentForm = ({ commentOn, owner, root }: MovieCommentFormProps) =>
// Upload metadata to IPFS
const uri = await uploadMetadataToIPFS(metadata);

// Send to redux the pending comment
// Dispatch the addPendingComment action
dispatch(addPendingComment({ publicationId: commentOn, comment: {...pendingComment, uri}}));
// Reset
reset(); // Reset the form
// Verify availability of metadata on IPFS / Retries

// Eliminar el efecto

await verifyIpfsData(uri);

// Create comment with retry logic
await executeCreateCommentWithRetry(createComment, {
commentOn: commentOn as any,
metadata: uri,
}).then(() => {
// Update the comment status to confirmed
dispatch(removePendingComment({ publicationId: commentOn, commentId: pendingComment.id}));
// Send notifications to the author of the publication
const notificationPayload = generatePayload('COMMENT', {
id: owner?.id,
displayName: owner?.displayName,
avatar: owner?.avatar,
}, {
root_id: root,
comment_id: commentOn,
rawDescription: `${sessionData?.profile?.metadata?.displayName} left a comment`,
});

// Only notify the author if the comment is not on their own publication
if (owner?.id !== sessionData?.profile?.id) {
sendNotification(owner.id, sessionData?.profile?.id, notificationPayload);
}
dispatch({
type: 'ADD_TASK_TO_BACKGROUND',
payload: {
id: uuidv4(),
type: 'POST_COMMENT',
data: {
commentOn,
uri,
pendingComment,
createComment,
owner,
generatePayload,
sendNotification,
root,
},
},
});

// If execution reaches here, the comment was created successfully
console.log('Comment created successfully');
reset(); // Reset the form
dispatch(refetchCommentsByPublication(commentOn));
reset();
} catch (e: any) {
console.error('Error creating the comment:', e.message);

// Handle specific failure scenarios if necessary
if (e.message.includes('BroadcastingError')) {
console.log('Error broadcasting the transaction:', e.message);
} else if (e.message.includes('PendingSigningRequestError')) {
console.log(
'There is a pending signature request in your wallet. ' +
'Approve it or dismiss it and try again.'
);
} else if (e.message.includes('WalletConnectionError')) {
console.log('Error connecting to the wallet:', e.message);
} else if (e.message.includes('UserRejectedError')) {
// The user decided not to sign
} else {
console.log('Error:', e.message);
}
}
});

Expand Down
4 changes: 2 additions & 2 deletions src/utils/ipfs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ export const uploadMetadataToIPFS = async (metadata: any): Promise<string> => {
* @param delayMs - Delay in milliseconds between retries.
* @returns {Promise<boolean>} - Resolves to true if data is accessible, otherwise throws an error.
*/
export const verifyIpfsData = async (uri: string, retries = 8, delayMs = 2000): Promise<boolean> => {
export const verifyIpfsData = async (uri: string, retries = 16, delayMs = 2000): Promise<boolean> => {
const gateway = 'https://gw.ipfs-lens.dev/ipfs/'; // Use only lens's gateway

const hash = uri.replace('ipfs://', '');
Expand All @@ -127,7 +127,7 @@ export const verifyIpfsData = async (uri: string, retries = 8, delayMs = 2000):
return true;
}
} catch (error: any) {
console.warn(`Attempt ${attempt}: Could not access ${url}. Error: ${error.message}`);
console.log(`Attempt ${attempt}: Could not access ${url}. Error: ${error.message}`);
}
if (attempt < retries) {
console.log(`Retrying in ${delayMs}ms... (${attempt}/${retries})`);
Expand Down
25 changes: 25 additions & 0 deletions src/workers/backgroundTaskWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { verifyIpfsData } from '@src/utils/ipfs';

export type CommentPayload = {
uri: string;
pendingCommentId: string;
};

self.onmessage = async (event: MessageEvent<{ type: string; payload: CommentPayload }>) => {
const { type, payload } = event.data;

if (type === 'VERIFY_IPFS') {
const { uri, pendingCommentId } = payload;

try {
// Verify metadata on IPFS
await verifyIpfsData(uri);

// Send a success message to main thread
self.postMessage({ success: true, pendingCommentId });
} catch (error) {
// Send a error message to main thread
self.postMessage({ success: false, error: (error as Error).message, pendingCommentId });
}
}
};

0 comments on commit 30957b6

Please sign in to comment.