Created
October 24, 2024 21:07
-
-
Save tranvansang/e25c9ca1691a472f87271347b5c98588 to your computer and use it in GitHub Desktop.
Copy_Folder_Google_Drive_to_Google_Drive.ipynb
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "nbformat": 4, | |
| "nbformat_minor": 0, | |
| "metadata": { | |
| "colab": { | |
| "provenance": [], | |
| "include_colab_link": true | |
| }, | |
| "kernelspec": { | |
| "name": "python3", | |
| "display_name": "Python 3" | |
| }, | |
| "language_info": { | |
| "name": "python" | |
| } | |
| }, | |
| "cells": [ | |
| { | |
| "cell_type": "markdown", | |
| "metadata": { | |
| "id": "view-in-github", | |
| "colab_type": "text" | |
| }, | |
| "source": [ | |
| "<a href=\"https://colab.research.google.com/gist/tranvansang/e25c9ca1691a472f87271347b5c98588/copy_folder_google_drive_to_google_drive.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "source": [ | |
| "# Copy Folder Google Drive to Google Drive - 1TouchPro" | |
| ], | |
| "metadata": { | |
| "id": "8er7vEN1ApwJ" | |
| } | |
| }, | |
| { | |
| "cell_type": "code", | |
| "source": [ | |
| "#@title Input\n", | |
| "from ipywidgets import widgets\n", | |
| "\n", | |
| "dest_text = widgets.Text(description=\"Your drive\", placeholder='Nhập đường link folder Google Drive của bạn')\n", | |
| "source_text = widgets.Text(description=\"Shared drive\", placeholder='Nhập đường link folder Google Drive shared')\n", | |
| "from_page_text = widgets.Text(description=\"Từ trang\", value=\"0\")\n", | |
| "to_page_text = widgets.Text(description=\"Đến trang\", value=\"0\")\n", | |
| "max_download_size_text = widgets.Text(description=\"Tổng dung lượng tối đa(GB)\", value=\"700\")\n", | |
| "exclude_str_text = widgets.Text(description=\"Bỏ file, folder có chứa nội dung\", value=\"\")\n", | |
| "\n", | |
| "display(dest_text)\n", | |
| "display(source_text)\n", | |
| "display(from_page_text)\n", | |
| "display(to_page_text)\n", | |
| "display(max_download_size_text)\n", | |
| "display(exclude_str_text)" | |
| ], | |
| "metadata": { | |
| "id": "Co6t50QUNhso" | |
| }, | |
| "execution_count": null, | |
| "outputs": [] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "source": [ | |
| "#@title Run\n", | |
| "import os\n", | |
| "import time\n", | |
| "import re\n", | |
| "import sys\n", | |
| "from googleapiclient.discovery import build\n", | |
| "from google.colab import auth\n", | |
| "from google.colab import drive\n", | |
| "\n", | |
| "class DownloadFromDrive:\n", | |
| " def __init__(self):\n", | |
| " self._total_size = 0\n", | |
| " self._limit_size = 0\n", | |
| " self.excluded_strings = []\n", | |
| "\n", | |
| " def get_user_credential(self):\n", | |
| " auth.authenticate_user()\n", | |
| " drive_service = build('drive', 'v3')\n", | |
| " return drive_service\n", | |
| "\n", | |
| " def get_childs_from_folder(self, drive_service, folder_id, from_page, to_page):\n", | |
| " files = []\n", | |
| " page_token = None\n", | |
| " query = f\"'{folder_id}' in parents and trashed = false\"\n", | |
| " if self.excluded_strings and len(self.excluded_strings) > 0:\n", | |
| " not_contains_query = \" and \".join([f\"not name contains '{ext}'\" for ext in self.excluded_strings])\n", | |
| " query += f\" and ({not_contains_query})\"\n", | |
| "\n", | |
| " pages = 0\n", | |
| " while True:\n", | |
| " try:\n", | |
| " pages += 1\n", | |
| " response = drive_service.files().list(q=query,\n", | |
| " orderBy='name, createdTime',\n", | |
| " fields='files(id, name, mimeType, size), nextPageToken',\n", | |
| " pageToken=page_token,\n", | |
| " supportsAllDrives=True,\n", | |
| " includeItemsFromAllDrives=True).execute()\n", | |
| "\n", | |
| " if (from_page < pages <= to_page) or to_page == 0:\n", | |
| " files.extend(response.get('files', []))\n", | |
| "\n", | |
| " page_token = response.get('nextPageToken', None)\n", | |
| " if page_token is None or (pages >= to_page > 0):\n", | |
| " break\n", | |
| " except Exception as e:\n", | |
| " print(f\"An error occurred: {str(e)}\")\n", | |
| " page_token = None\n", | |
| "\n", | |
| " print(f\"Total files: {len(files)}\")\n", | |
| " return files\n", | |
| "\n", | |
| " def copy_file(self, drive_service, dest_folder_id, source_file):\n", | |
| " if source_file['mimeType'] != 'application/vnd.google-apps.folder':\n", | |
| " body_file_inf = {'parents': [dest_folder_id]}\n", | |
| "\n", | |
| " if not self.check_if_exists(drive_service, dest_folder_id, source_file['name']):\n", | |
| " try:\n", | |
| " start_time = time.time()\n", | |
| " request_copy = drive_service.files().copy(body=body_file_inf, fileId=source_file['id'],\n", | |
| " supportsAllDrives=True).execute()\n", | |
| " end_time = time.time()\n", | |
| "\n", | |
| " fileSize = int(source_file.get('size', 0))\n", | |
| " size_mb = fileSize / (1024 * 1024)\n", | |
| " self._total_size += size_mb\n", | |
| " speed_mb = size_mb / (end_time - start_time)\n", | |
| " print(f\"[{source_file['name']}] copied. Size {size_mb:0.2f} MB. Speed {speed_mb:0.2f} MB/s\")\n", | |
| "\n", | |
| "\n", | |
| " if self._total_size >= (self._limit_size * 1024):\n", | |
| " self.on_total_size_exceeded(f\"Total size exceeds {self._limit_size} GB. Ending the program.\")\n", | |
| " except Exception as e:\n", | |
| " print(\"An error occurred: \", e)\n", | |
| " else:\n", | |
| " print(f\"[{source_file['name']}] exists.\")\n", | |
| " else:\n", | |
| "\n", | |
| " source_files = self.get_childs_from_folder(drive_service, source_file['id'], 0, 0)\n", | |
| " if source_files and len(source_files) > 0:\n", | |
| " print(f\"Copy at Folder {source_file['name']} Starting\")\n", | |
| " sub_folder_id = self.create_folder(drive_service, dest_folder_id, source_file['name'])\n", | |
| " self.copy_multiple_files(drive_service, sub_folder_id, source_files)\n", | |
| " print(f\"Copy at Folder {source_file['name']} Ending\")\n", | |
| "\n", | |
| "\n", | |
| " def create_folder(self, drive_service, dest_folder_id, sub_folder_name):\n", | |
| " sub_folder_inf = {'name': sub_folder_name, 'mimeType': 'application/vnd.google-apps.folder', 'parents': [dest_folder_id]}\n", | |
| "\n", | |
| " exist_folder_id = self.check_if_exists(drive_service, dest_folder_id, sub_folder_name)\n", | |
| " if not exist_folder_id:\n", | |
| " try:\n", | |
| " folder = drive_service.files().create(body=sub_folder_inf, fields='id').execute()\n", | |
| " return folder['id']\n", | |
| " except Exception as e:\n", | |
| " print(\"An error occurred: \", e)\n", | |
| " return exist_folder_id\n", | |
| "\n", | |
| "\n", | |
| " def check_if_exists(self, drive_service, dest_folder_id, name):\n", | |
| " try:\n", | |
| " processed_name = name.replace(\"'\", \"\\\\'\")\n", | |
| "\n", | |
| " results = drive_service.files().list(q=f\"'{dest_folder_id}' in parents and name contains '{processed_name}' and trashed=false\",\n", | |
| " fields='files(id)').execute()\n", | |
| "\n", | |
| " if 'files' in results and len(results['files']) > 0:\n", | |
| " return results['files'][0]['id']\n", | |
| " except Exception as e:\n", | |
| " print(\"An error occurred: \", e)\n", | |
| "\n", | |
| " return \"\"\n", | |
| "\n", | |
| "\n", | |
| " def copy_multiple_files(self, drive_service, dest_folder_id, source_files):\n", | |
| " for source_file in source_files:\n", | |
| " self.copy_file(drive_service, dest_folder_id, source_file)\n", | |
| "\n", | |
| " def extract_folder_id_from_url(self, url):\n", | |
| " pattern = r'[-\\w]{25,}'\n", | |
| " match = re.search(pattern, url)\n", | |
| " if match:\n", | |
| " return match.group(0)\n", | |
| " else:\n", | |
| " return None\n", | |
| "\n", | |
| " def on_total_size_exceeded(self, message):\n", | |
| " print(message)\n", | |
| " sys.exit()\n", | |
| "\n", | |
| " def copy_drive_to_drive(self, destDriveLink, sourceDriveLink, from_page, to_page):\n", | |
| " service = self.get_user_credential()\n", | |
| "\n", | |
| " start_time = time.time()\n", | |
| " dest_folder_id = self.extract_folder_id_from_url(destDriveLink)\n", | |
| " source_folder_id = self.extract_folder_id_from_url(sourceDriveLink)\n", | |
| " source_folder = service.files().get(fileId=source_folder_id, supportsAllDrives=True).execute()\n", | |
| " new_dest_folder_id = self.create_folder(service, dest_folder_id, source_folder['name'])\n", | |
| "\n", | |
| " source_files = self.get_childs_from_folder(service, source_folder_id, from_page, to_page)\n", | |
| " self.copy_multiple_files(service, new_dest_folder_id, source_files)\n", | |
| " end_time = time.time()\n", | |
| "\n", | |
| " size_gb = self._total_size / 1024\n", | |
| " speed_mb = self._total_size / (end_time - start_time)\n", | |
| "\n", | |
| " print(f\"Done. Total Size {size_gb:0.2f} GB. Total Time {int(end_time - start_time)} s. SpeedMB {speed_mb:0.2f} MB/s\")\n", | |
| "\n", | |
| "\n", | |
| "# Main\n", | |
| "destDriveLink = dest_text.value\n", | |
| "sourceDriveLink = source_text.value\n", | |
| "fromPage = int(from_page_text.value)\n", | |
| "toPage = int(to_page_text.value)\n", | |
| "\n", | |
| "downloader = DownloadFromDrive()\n", | |
| "downloader._limit_size = float(max_download_size_text.value);\n", | |
| "downloader.excluded_strings = [ext.strip() for ext in exclude_str_text.value.split(\",\") if ext.strip()]\n", | |
| "downloader.copy_drive_to_drive(destDriveLink, sourceDriveLink, fromPage, toPage)" | |
| ], | |
| "metadata": { | |
| "id": "dd_MwROX7DxL" | |
| }, | |
| "execution_count": null, | |
| "outputs": [] | |
| } | |
| ] | |
| } |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
http://colab.research.google.com/gist/tranvansang/e25c9ca1691a472f87271347b5c98588/copy_folder_google_drive_to_google_drive.ipynb