Instantly share code, notes, and snippets.
-
Star
1
(1)
You must be signed in to star a gist -
Fork
0
(0)
You must be signed in to fork a gist
-
-
Save jborean93/13dbfbd94e83ff810afc55e178371eb9 to your computer and use it in GitHub Desktop.
| #!/usr/bin/env python3 | |
| # Copyright: (c) 2024, Jordan Borean (@jborean93) <[email protected]> | |
| # MIT License (see LICENSE or https://opensource.org/licenses/MIT) | |
| # PYTHON_ARGCOMPLETE_OK | |
| # Big thanks to pysmb for help with the RPC structures | |
| # https://github.com/miketeo/pysmb | |
| from __future__ import annotations | |
| import argparse | |
| import collections.abc | |
| import dataclasses | |
| import enum | |
| import sys | |
| import smbclient | |
| from smbprotocol.ioctl import CtlCode, IOCTLFlags, SMB2IOCTLRequest, SMB2IOCTLResponse | |
| HAS_ARGCOMPLETE = True | |
| try: | |
| import argcomplete | |
| except ImportError: | |
| HAS_ARGCOMPLETE = False | |
| class ShareType(enum.IntEnum): | |
| STYPE_DISKTREE = 0x00000000 | |
| STYPE_PRINTQ = 0x00000001 | |
| STYPE_DEVICE = 0x00000002 | |
| STYPE_IPC = 0x00000003 | |
| STYPE_CLUSTER_FS = 0x02000000 | |
| STYPE_CLUSTER_SOFS = 0x04000000 | |
| STYPE_CLUSTER_DFS = 0x08000000 | |
| class ShareTypeFlags(enum.IntFlag): | |
| NONE = 0x00000000 | |
| STYPE_TEMPORARY = 0x40000000 | |
| STYPE_SPECIAL = 0x80000000 | |
| @dataclasses.dataclass(frozen=True) | |
| class ShareInfo: | |
| name: str | |
| share_type: ShareType | |
| share_type_flags: ShareTypeFlags | |
| comment: str | |
| def build_dce_rpc_req(call_id: int, packet_type: int, data: bytes) -> bytes: | |
| return b"".join( | |
| [ | |
| # Version | |
| b"\x05" | |
| # Version minor | |
| b"\x00", | |
| # Packet type | |
| packet_type.to_bytes(1, byteorder="little"), | |
| # Packet Flags (First | Last fragment) | |
| b"\x03", | |
| # Data rep (ASCII/Little Endian/IEEE) | |
| b"\x10\x00\x00\x00", | |
| # Frag Length | |
| (len(data) + 16).to_bytes(2, byteorder="little"), | |
| # Auth Length | |
| b"\x00\x00", | |
| # Call Id | |
| call_id.to_bytes(4, byteorder="little"), | |
| data, | |
| ] | |
| ) | |
| def build_dce_rpc_bind(call_id: int) -> bytes: | |
| data = b"".join( | |
| [ | |
| # Max Xmit Frag - 4280 | |
| b"\xb8\x10", | |
| # Max Recv Frag - 4280 | |
| b"\xb8\x10", | |
| # Assoc Group | |
| b"\x00\x00\x00\x00", | |
| # Num Ctx Items | |
| b"\x02\x00\x00\x00", | |
| # Ctx 1 - SRVSVC v3.0 - 32bit NDR | |
| # 8a885d04-1ceb-11c9-9fe8-08002b104860 | |
| b"\x00\x00\x01\x00\xc8\x4f\x32\x4b\x70\x16\xd3\x01\x12\x78\x5a\x47", | |
| b"\xbf\x6e\xe1\x88\x03\x00\x00\x00\x04\x5d\x88\x8a\xeb\x1c\xc9\x11", | |
| b"\x9f\xe8\x08\x00\x2b\x10\x48\x60\x02\x00\x00\x00", | |
| # Ctx 2 - SRVSVC v3.0 - Bind Time Feature Negotiation | |
| # 6cb71c2c-9812-4540-0300-000000000000 | |
| b"\x01\x00\x01\x00\xc8\x4f\x32\x4b\x70\x16\xd3\x01\x12\x78\x5a\x47", | |
| b"\xbf\x6e\xe1\x88\x03\x00\x00\x00\x2c\x1c\xb7\x6c\x12\x98\x40\x45", | |
| b"\x03\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00", | |
| ] | |
| ) | |
| return build_dce_rpc_req(call_id, 11, data) | |
| def build_net_share_enum(call_id: int, server_name: str) -> bytes: | |
| """ | |
| NET_API_STATUS NetrShareEnum( | |
| [in, string, unique] SRVSVC_HANDLE ServerName, | |
| [in, out] LPSHARE_ENUM_STRUCT InfoStruct, | |
| [in] DWORD PreferedMaximumLength, | |
| [out] DWORD* TotalEntries, | |
| [in, out, unique] DWORD* ResumeHandle | |
| ); | |
| """ | |
| server_len = len(server_name) + 1 | |
| server_len_padding = b"" | |
| if server_len % 2: | |
| server_len_padding = b"\x00\x00" | |
| b_server_name = (server_name + "\u0000").encode("utf-16-le") | |
| stub_data = b"".join( | |
| [ | |
| # ServerName - Pointer | |
| b"\x00\x00\x02\x00", # Referent Id | |
| server_len.to_bytes(4, byteorder="little"), # Max Value Len | |
| b"\x00\x00\x00\x00", # Offset | |
| server_len.to_bytes(4, byteorder="little"), # Value Len | |
| b_server_name, | |
| server_len_padding, | |
| # InfoStruct | |
| (1).to_bytes(4, byteorder="little"), # Level - SHARE_INFO_1_CONTAINER | |
| b"\x01\x00\x00\x00\x04\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00", # Unknown still part of InfoStruct | |
| b"\xff\xff\xff\xff", # PreferedMaximumLength | |
| b"\x08\x00\x02\x00\x00\x00\x00\x00", # ResumeHandle | |
| ] | |
| ) | |
| rpc_request_data = b"".join( | |
| [ | |
| # Alloc hint | |
| b"\x4c\x00\x00\x00", | |
| # Context ID | |
| b"\x00\x00", | |
| # Opnum (NetShareEnumAll), | |
| (15).to_bytes(2, byteorder="little"), | |
| stub_data, | |
| ] | |
| ) | |
| return build_dce_rpc_req(call_id, 0, rpc_request_data) | |
| def unpack_smb_share_info(data: memoryview) -> list[ShareInfo]: | |
| # Ignore the RPC PDU header (24 bytes) + leading data in the RPC result | |
| # we don't care about (12 bytes). | |
| result_len = int.from_bytes(data[36:40], byteorder="little") | |
| array_view = data[48:] | |
| str_view = array_view[(12 * result_len) :] | |
| results = [] | |
| for _ in range(result_len): | |
| share_type_raw = int.from_bytes(array_view[4:8], byteorder="little") | |
| share_type = ShareType(share_type_raw & 0x0FFFFFFF) | |
| share_flags = ShareTypeFlags(share_type_raw & 0xF0000000) | |
| array_view = array_view[12:] | |
| name_len = int.from_bytes(str_view[8:12], byteorder="little") | |
| name = str_view[12 : 12 + (name_len - 1) * 2].tobytes().decode("utf-16-le") | |
| if name_len % 2: | |
| name_len += 1 | |
| str_view = str_view[12 + (name_len * 2) :] | |
| comment_len = int.from_bytes(str_view[8:12], byteorder="little") | |
| comment = ( | |
| str_view[12 : 12 + (comment_len - 1) * 2].tobytes().decode("utf-16-le") | |
| ) | |
| if comment_len % 2: | |
| comment_len += 1 | |
| str_view = str_view[12 + (comment_len * 2) :] | |
| results.append(ShareInfo(name, share_type, share_flags, comment)) | |
| return results | |
| def get_share_info(server: str) -> list[ShareInfo]: | |
| with smbclient.open_file( | |
| rf"\\{server}\IPC$\srvsvc", | |
| mode="w+b", | |
| buffering=0, | |
| file_type="pipe", | |
| ) as srvsvc: | |
| connection = srvsvc.fd.connection | |
| fid = srvsvc.fd.file_id | |
| sid = srvsvc.fd.tree_connect.session.session_id | |
| tid = srvsvc.fd.tree_connect.tree_connect_id | |
| # Bind to DCE RPC service | |
| bind_req = build_dce_rpc_bind(1) | |
| srvsvc.write(bind_req) | |
| _ = srvsvc.read(1024) # bind_ack - should validate this | |
| # Send the NetShareEnumAll as part of an IOCTL request | |
| net_share_enum_req = build_net_share_enum(2, rf"\\{server}") | |
| ioctl_req = SMB2IOCTLRequest() | |
| ioctl_req["ctl_code"] = CtlCode.FSCTL_PIPE_TRANSCEIVE | |
| ioctl_req["file_id"] = fid | |
| ioctl_req["flags"] = IOCTLFlags.SMB2_0_IOCTL_IS_FSCTL | |
| ioctl_req["max_output_response"] = 8196 | |
| ioctl_req["buffer"] = net_share_enum_req | |
| req = connection.send(ioctl_req, sid=sid, tid=tid) | |
| resp = connection.receive(req) | |
| ioctl_resp = SMB2IOCTLResponse() | |
| ioctl_resp.unpack(resp["data"].get_value()) | |
| # Should validate this is a success response not a failure | |
| net_share_enum_resp = ioctl_resp["buffer"].get_value() | |
| return unpack_smb_share_info(memoryview(net_share_enum_resp)) | |
| def parse_args(args: collections.abc.Sequence[str]) -> argparse.Namespace: | |
| """Parse and return args.""" | |
| parser = argparse.ArgumentParser( | |
| description="List SMB shares for the specific server.", | |
| ) | |
| parser.add_argument( | |
| "server", | |
| nargs=1, | |
| default="", | |
| type=str, | |
| help="The SMB server to list the share for.", | |
| ) | |
| if HAS_ARGCOMPLETE: # pragma: nocover | |
| argcomplete.autocomplete(parser) | |
| parsed_args = parser.parse_args(args) | |
| return parsed_args | |
| def main(args: collections.abc.Sequence[str]) -> None: | |
| """Run the main program.""" | |
| parsed_args = parse_args(args) | |
| shares = get_share_info(parsed_args.server[0]) | |
| for share in shares: | |
| print(share) | |
| if __name__ == "__main__": | |
| main(sys.argv[1:]) |
Thank you for your sharing. I am not so familiar with SMB protocol, but I think I can understand your concern.
Well, fortunately, in my project, we decided to use only the part belongs to SMB protocol and don't need the list feature for now.
But I still hope this feature can be migrated into the smbprotocol package although it is actually not a part of smb protocol because not every one knows the details about this and most of us want an all-in-one solution that can manipulate the shared resources based and would expect everything I can do with the Windows Shared folder can be done programmatically.
Maybe you can consider to put them into a different module with a name like extra or helper, to indicate this is actually not a part of smb protocol. 😉
an example of using it with a handler obtained from Open ... handler with the following requests.