Skip to content

Instantly share code, notes, and snippets.

@debsouryadatta
Created November 6, 2025 09:36
Show Gist options
  • Select an option

  • Save debsouryadatta/cd0081811e9e8102225e8cbdf453d6cf to your computer and use it in GitHub Desktop.

Select an option

Save debsouryadatta/cd0081811e9e8102225e8cbdf453d6cf to your computer and use it in GitHub Desktop.
csv upload
// Endpoint
uploadRoutes.post(
"/csv",
zValidator("form", createDocumentSchema),
jwtMiddleware,
async (c) => {
const fileData = c.req.valid("form");
const payload = c.get("jwtPayload");
console.log(payload.id);
const community = await authRepo.getCommunityByUserId(payload.id);
if (!community) {
return c.json(
{ error: "Unable to determine community for the user." },
400
);
}
const defaultCommunityId = community.id;
console.log("Uploading users with csv file");
try {
// Generate a unique key for the file
const uniqueKey = `${nanoid()}-${fileData.file.name}`;
// Use /tmp directory for Lambda
const localFilePath = path.join("/tmp", uniqueKey);
// Save the file locally in /tmp
fs.writeFileSync(
localFilePath,
Buffer.from(await fileData.file.arrayBuffer())
);
// Process the CSV and insert into the database
const result = await authRepo.bulkInsertUsers(
localFilePath,
defaultCommunityId
);
// Cleanup the local file
fs.unlinkSync(localFilePath);
// Return the response
if (!result?.success) {
const err: any = result?.error ?? {};
const code = err?.code;
const detail = err?.detail || err?.message || String(err || "");
const constraint = err?.constraint_name || err?.constraint;
const status = code === "23505" ? 409 : 400;
const messageParts = [result.message];
if (result.missingDepartments) {
messageParts.push(`Missing departments: ${result.missingDepartments.join(", ")}`);
}
if (result.missingRoles) {
messageParts.push(`Missing roles: ${result.missingRoles.join(", ")}`);
}
return c.json(
{
success: false,
message: messageParts.join(". "),
},
status
);
}
return c.json({ success: true, message: result.message }, 201);
} catch (error) {
console.error("Error during CSV upload and processing:", error);
return c.json({ error: "Failed to process file", details: error }, 500);
}
}
);
// Function
bulkInsertUsers = async (csvPath: string, defaultCommunityId: string) => {
try {
console.log(`Starting bulk insert users from CSV: ${csvPath}`);
const records: any[] = [];
const roleCache: Record<string, string> = {}; // Cache role_id to avoid multiple DB lookups
let skippedDuplicates = 0;
// Fetch community details with department_info
console.log(`Fetching community details for community ID: ${defaultCommunityId}`);
const communityDetails = await db
.select()
.from(community)
.where(eq(community.id, defaultCommunityId));
if (!communityDetails || communityDetails.length === 0) {
console.warn(`Community not found for ID: ${defaultCommunityId}`);
return {
success: false,
message: "Community not found.",
};
}
const departmentInfo = (communityDetails[0].department_info as any[]) || [];
// First pass: collect all unique roles and departments from CSV
const csvRoles = new Set<string>();
const csvDepartments = new Set<string>();
const tempRecords: any[] = [];
const parser = fs.createReadStream(csvPath).pipe(
parse({
columns: true,
skip_empty_lines: true,
})
);
for await (const row of parser) {
// console.log("Row:", row);
// Skip rows that are effectively empty (all fields blank/null/undefined)
const isEffectivelyEmpty = Object.values(row || {}).every((val) => {
if (val === undefined || val === null) return true;
if (typeof val === "string" && val.trim() === "") return true;
return false;
});
if (isEffectivelyEmpty) {
console.warn("Skipping empty row");
continue;
}
const roleName = (row.role ?? "").trim();
const departmentName = (row.department ?? "").trim();
if (roleName) csvRoles.add(roleName);
if (departmentName) csvDepartments.add(departmentName);
tempRecords.push(row);
}
// Validate departments and roles against department_info
const missingDepartments: string[] = [];
const missingRoles: { role: string; department: string }[] = [];
for (const department of csvDepartments) {
const deptInfo = departmentInfo.find(
(d: any) => d.name === department && d.status === "active"
);
if (!deptInfo) {
missingDepartments.push(department);
}
}
// Validate roles within their respective departments
for (const row of tempRecords) {
const roleName = (row.role ?? "").trim();
const departmentName = (row.department ?? "").trim();
if (!roleName || !departmentName) continue;
const deptInfo = departmentInfo.find(
(d: any) => d.name === departmentName && d.status === "active"
);
if (deptInfo) {
const rolesInDept = deptInfo.rolesInDepartment || [];
if (!rolesInDept.includes(roleName)) {
// Check if this role-department combo is already in the missing list
const alreadyAdded = missingRoles.some(
(item) => item.role === roleName && item.department === departmentName
);
if (!alreadyAdded) {
missingRoles.push({ role: roleName, department: departmentName });
}
}
}
}
// Return error if there are missing departments or roles
if (missingDepartments.length > 0 || missingRoles.length > 0) {
let errorMessage = "Validation failed: ";
const errors: string[] = [];
if (missingDepartments.length > 0) {
errors.push(
`The following departments need to be added to the community: ${missingDepartments.join(", ")}`
);
}
if (missingRoles.length > 0) {
const roleMessages = missingRoles.map(
(item) => `'${item.role}' in department '${item.department}'`
);
errors.push(
`The following roles need to be added to their respective departments: ${roleMessages.join("; ")}`
);
}
errorMessage += errors.join(". ");
return {
success: false,
message: errorMessage,
missingDepartments,
missingRoles,
};
}
// Collect all emails from CSV and check for duplicates in DB
const emailsToCheck = tempRecords
.map((row) => (row.email ?? "").trim())
.filter((email) => email !== "");
const existingEmails = new Set<string>();
if (emailsToCheck.length > 0) {
const existingUsers = await db
.select({ email: users.email })
.from(users)
.where(inArray(users.email, emailsToCheck));
for (const user of existingUsers) {
if (user.email) {
existingEmails.add(user.email.toLowerCase());
}
}
}
// Track emails we've already processed in this CSV to avoid duplicates within the file
const processedEmailsInCsv = new Set<string>();
// Second pass: process the records
for await (const row of tempRecords) {
// Skip rows that are effectively empty (all fields blank/null/undefined)
const isEffectivelyEmpty = Object.values(row || {}).every((val) => {
if (val === undefined || val === null) return true;
if (typeof val === "string" && val.trim() === "") return true;
return false;
});
if (isEffectivelyEmpty) {
console.warn("Skipping empty row");
continue;
}
// Guard: must have at least one identifier (phone or email)
const phone = (row.phone_number ?? "").trim();
const emailAddr = (row.email ?? "").trim();
if (!phone && !emailAddr) {
console.warn("Skipping row without phone_number and email", row);
continue;
}
// Check if email already exists in database
if (emailAddr && existingEmails.has(emailAddr.toLowerCase())) {
console.warn(`Skipping row: email "${emailAddr}" already exists in database.`);
skippedDuplicates++;
continue;
}
// Check if email already appeared in this CSV file
if (emailAddr && processedEmailsInCsv.has(emailAddr.toLowerCase())) {
console.warn(`Skipping row: email "${emailAddr}" appears multiple times in CSV.`);
skippedDuplicates++;
continue;
}
// Add email to the set of processed emails
if (emailAddr) {
processedEmailsInCsv.add(emailAddr.toLowerCase());
}
// Guard role: if role is absent, skip the row instead of throwing
const roleName = (row.role ?? "").trim();
if (!roleName) {
console.warn("Skipping row without role", row);
continue;
}
let roleId = roleCache[roleName];
if (!roleId) {
// Fetch role_id from the roles table
const role = await db.query.roles.findFirst({
where: (roles, { eq }) => eq(roles.name, roleName),
});
if (!role) {
console.warn(
`Skipping row: role "${roleName}" does not exist in the roles table.`
);
continue;
}
roleId = role.id;
roleCache[roleName] = roleId; // Cache the role_id for future lookups
console.log("Role ID:", roleId);
}
const communityId = (row.community_id || defaultCommunityId || "").trim();
if (!communityId) {
console.warn(
`Skipping row: Community ID is missing for user: ${(row.first_name ?? "").trim()} ${(row.last_name ?? "").trim()}`
);
continue;
}
records.push({
id: generatePK(),
first_name: (row.first_name ?? "").trim(),
last_name: (row.last_name ?? "").trim(),
phone_number: phone,
email: emailAddr,
managerEmailId: (row.managerEmailId ?? "").trim(),
department: (row.department ?? "").trim(),
designation: (row.designation ?? "").trim(),
community_id: communityId,
role_id: roleId, // Use the fetched role_id
isMobileVerified: true,
isEmailVerified: true,
user_status: "approved",
active: true,
});
}
// Insert into the database
try {
if (records.length === 0) {
const message = skippedDuplicates > 0
? `No valid rows to insert. Skipped ${skippedDuplicates} duplicate email(s).`
: "No valid rows to insert.";
console.warn(message);
return {
success: true,
message,
};
}
console.log(`Inserting ${records.length} users into the database`);
await db.insert(users).values(records);
const message = skippedDuplicates > 0
? `${records.length} users inserted successfully. Skipped ${skippedDuplicates} duplicate email(s).`
: `${records.length} users inserted successfully.`;
console.log(message);
return {
success: true,
message,
};
} catch (error) {
console.error("Error inserting users into database:", error);
return { success: false, message: "Failed to insert users.", error };
}
} catch (error) {
console.error("Error in bulkInsertUsers function:", error);
return { success: false, message: "Bulk insert operation failed.", error };
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment