Detecting USB drive insertion & removal on Windows using Python

I was looking for a way to monitor USB drives and trigger a backup when I plug in my backup drive. Most solutions online describe a way to do it in C# or C++, but I wanted to write one in Python.

My first naive attempt was to get the list of drives, and poll for changes every couple of seconds. I've explained this method below and it works fine.

However I wasn't really comfortable with launching a subprocess every second. It seemed wasteful. So I've written a second method that uses Win32 APIs to hook into Windows and monitor drive changes.

If you're interested, I have another post where I build a clipboard listener that you can use to monitor clipboard contents.

Getting a list of drives

Windows provides an interface for sysadmins called Windows Management Instrumentation (WMI) which basically allows you to monitor a local or remote system.

On PowerShell we can use Get-WmiObject command to interact with it. WMI Win32_LogicalDisk class gives us a list of the drives currently connected to the PC.

PS > Get-WmiObject -Class Win32_LogicalDisk

DeviceID     : C:
DriveType    : 3
ProviderName :
FreeSpace    : 21341257728
Size         : 130417745920
VolumeName   :

...

DeviceID     : E:
DriveType    : 2
ProviderName :
FreeSpace    : 48837689344
Size         : 62706155520
VolumeName   : ABDUS

That's a lot of useful information 👌. We can use DriveType to determine if a drive is removable or not.

DriveType Description
0 Unknown
1 No Root Directory
2 Removable Disk
3 Local Disk
4 Network Drive
5 Compact Disc
6 RAM Disk

Finally, we can pick the fields we need and convert the command result to JSON so that Python can easily parse it.

PS > Get-WmiObject -Class Win32_LogicalDisk | Select-Object deviceid,volumename,drivetype | ConvertTo-Json
[
  {
    "DeviceID": "C:",
    "VolumeName": "",
    "DriveType": 3
  },
  {
    "DeviceID": "E:",
    "VolumeName": "ABDUS",
    "DriveType": 2
  },
  ...
]

Now we can call this PowerShell command from Python using subprocess module and parse its JSON output:

from dataclasses import dataclass
from typing import Callable, List

@dataclass
class Drive:
    letter: str
    label: str
    drive_type: str

    @property
    def is_removable(self) -> bool:
        return self.drive_type == 'Removable Disk'

def list_drives() -> List[Drive]:
    """
    Get a list of drives using WMI
    :return: list of drives
    """
    proc = subprocess.run(
        args=[
            'powershell',
            '-noprofile',
            '-command',
            'Get-WmiObject -Class Win32_LogicalDisk | Select-Object deviceid,volumename,drivetype | ConvertTo-Json'
        ],
        text=True,
        stdout=subprocess.PIPE
    )
    if proc.returncode != 0 or not proc.stdout.strip():
        print('Failed to enumerate drives')
        return []
    devices = json.loads(proc.stdout)

    drive_types = {
        0: 'Unknown',
        1: 'No Root Directory',
        2: 'Removable Disk',
        3: 'Local Disk',
        4: 'Network Drive',
        5: 'Compact Disc',
        6: 'RAM Disk',
    }

    return [Drive(
        letter=d['deviceid'],
        label=d['volumename'],
        drive_type=drive_types[d['drivetype']]
    ) for d in devices]

if __name__ == '__main__':
    print(list_drives())

which outputs

[Drive(letter='C:', label='', drive_type='Local Disk'), ...]

OK. Now the difficult part: hooking it up to Windows.

Listening to Windows WM_DEVICECHANGE messages

Windows uses messages to notify programs of events and let them react. This includes user inputs (mouse clicks, key strokes etc.) and other OS events (hardware connected, low power, A/C adapter connected etc.).

Windows broadcasts a WM_DEVICECHANGE message when hardware configuration of the system changes. That includes plug-and-play devices, such as USB drives, printers, mouse etc. We need a way to listen to this broadcast. That requires creating a window, registering a window procedure, then running a message loop that receives messages from the operating system.

We need the pywin32 package, which provides extensions to consume Win32 APIs in Python. We can install it with pip:

pip install pywin32

To create a window, we can follow Microsoft's docs and translate the given C++ example to its Python equivalent.

import win32api, win32con, win32gui

def create_window() -> int:
    """
    Create a window for listening to messages
    https://docs.microsoft.com/en-us/windows/win32/api/winuser/nf-winuser-createwindowexa#example
    
    See also: https://docs.microsoft.com/en-us/windows/win32/learnwin32/creating-a-window#creating-the-window

    :return: window hwnd
    """
    wc = win32gui.WNDCLASS()
    wc.lpfnWndProc = print
    wc.lpszClassName = 'Demo'
    wc.hInstance = win32api.GetModuleHandle(None)
    class_atom = win32gui.RegisterClass(wc)
    return win32gui.CreateWindow(class_atom, 'Demo', 0, 0, 0, 0, 0, 0, 0, wc.hInstance, None)

if __name__ == '__main__':
    create_window()
    win32gui.PumpMessages()

The highlighted line is the critical part. It is our "window procedure" that gets called when the window receives a message. It must have the following signature:

LRESULT CALLBACK MainWndProc(
    HWND hwnd,        // handle to window
    UINT uMsg,        // message identifier
    WPARAM wParam,    // first message parameter
    LPARAM lParam)    // second message parameter
{ ... }

Python's print function accepts an arbitrary number of parameters and dumps all its arguments to console, so its signature is compatible.

Once we run this script we can see the messages received by our window:

789538 537 7 0
789538 537 32772 346174713248
...

All these integers are either a value or a pointer to a value. Let's break down the second message:

Parameter Value Hexadecimal Description
hwnd 789538 0xc0c22 our window's handle
msg 537 0x0219 WM_DEVICECHANGE message
wparam 32772 0x8004 DBT_DEVICEREMOVECOMPLETE event
lparam 346174713248 0x50999eeda0 pointer (memory address) to event info

Once we decipher the message type, we can google its hexadecimal value to find which message it corresponds to, then figure out what it contains. Here we don't really need to dereference and unpack lparam pointer, because we're using WMI for that. We just need to know something has happened.

WM_DEVICECHANGE message gives us DBT_DEVICEARRIVAL and DBT_DEVICEREMOVECOMPLETE events to notify when a device is added or removed respectively.

Now putting these all together:

Python script

I wrapped the code I've explained above inside DeviceListener class. It provides an easy way to attach your own listener and perform a task when a drive is attached/removed.

It assumes the script will be run as the main script, that's why DeviceListener.start() is blocking. You can run it inside a thread if you want it to be non-blocking.

You can also change the highlighted line to trigger the callback only when a drive is either inserted or removed.

import json
import logging
import subprocess
from dataclasses import dataclass
from typing import Callable, List

import win32api, win32con, win32gui

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class Drive:
    letter: str
    label: str
    drive_type: str

    @property
    def is_removable(self) -> bool:
        return self.drive_type == 'Removable Disk'


class DeviceListener:
    """
    Listens to Win32 `WM_DEVICECHANGE` messages
    and trigger a callback when a device has been plugged in or out

    See: https://docs.microsoft.com/en-us/windows/win32/devio/wm-devicechange
    """
    WM_DEVICECHANGE_EVENTS = {
        0x0019: ('DBT_CONFIGCHANGECANCELED', 'A request to change the current configuration (dock or undock) has been canceled.'),
        0x0018: ('DBT_CONFIGCHANGED', 'The current configuration has changed, due to a dock or undock.'),
        0x8006: ('DBT_CUSTOMEVENT', 'A custom event has occurred.'),
        0x8000: ('DBT_DEVICEARRIVAL', 'A device or piece of media has been inserted and is now available.'),
        0x8001: ('DBT_DEVICEQUERYREMOVE', 'Permission is requested to remove a device or piece of media. Any application can deny this request and cancel the removal.'),
        0x8002: ('DBT_DEVICEQUERYREMOVEFAILED', 'A request to remove a device or piece of media has been canceled.'),
        0x8004: ('DBT_DEVICEREMOVECOMPLETE', 'A device or piece of media has been removed.'),
        0x8003: ('DBT_DEVICEREMOVEPENDING', 'A device or piece of media is about to be removed. Cannot be denied.'),
        0x8005: ('DBT_DEVICETYPESPECIFIC', 'A device-specific event has occurred.'),
        0x0007: ('DBT_DEVNODES_CHANGED', 'A device has been added to or removed from the system.'),
        0x0017: ('DBT_QUERYCHANGECONFIG', 'Permission is requested to change the current configuration (dock or undock).'),
        0xFFFF: ('DBT_USERDEFINED', 'The meaning of this message is user-defined.'),
    }

    def __init__(self, on_change: Callable[[List[Drive]], None]):
        self.on_change = on_change

    def _create_window(self):
        """
        Create a window for listening to messages
        https://docs.microsoft.com/en-us/windows/win32/learnwin32/creating-a-window#creating-the-window

        See also: https://docs.microsoft.com/en-us/windows/win32/api/winuser/nf-winuser-createwindoww

        :return: window hwnd
        """
        wc = win32gui.WNDCLASS()
        wc.lpfnWndProc = self._on_message
        wc.lpszClassName = self.__class__.__name__
        wc.hInstance = win32api.GetModuleHandle(None)
        class_atom = win32gui.RegisterClass(wc)
        return win32gui.CreateWindow(class_atom, self.__class__.__name__, 0, 0, 0, 0, 0, 0, 0, wc.hInstance, None)

    def start(self):
        logger.info(f'Listening to drive changes')
        hwnd = self._create_window()
        logger.debug(f'Created listener window with hwnd={hwnd:x}')
        logger.debug(f'Listening to messages')
        win32gui.PumpMessages()

    def _on_message(self, hwnd: int, msg: int, wparam: int, lparam: int):
        if msg != win32con.WM_DEVICECHANGE:
            return 0
        event, description = self.WM_DEVICECHANGE_EVENTS[wparam]
        logger.debug(f'Received message: {event} = {description}')
        if event in ('DBT_DEVICEREMOVECOMPLETE', 'DBT_DEVICEARRIVAL'):
            logger.info('A device has been plugged in (or out)')
            self.on_change(self.list_drives())
        return 0

    @staticmethod
    def list_drives() -> List[Drive]:
        """
        Get a list of drives using WMI
        :return: list of drives
        """
        proc = subprocess.run(
            args=[
                'powershell',
                '-noprofile',
                '-command',
                'Get-WmiObject -Class Win32_LogicalDisk | Select-Object deviceid,volumename,drivetype | ConvertTo-Json'
            ],
            text=True,
            stdout=subprocess.PIPE
        )
        if proc.returncode != 0 or not proc.stdout.strip():
            logger.error('Failed to enumerate drives')
            return []
        devices = json.loads(proc.stdout)

        drive_types = {
            0: 'Unknown',
            1: 'No Root Directory',
            2: 'Removable Disk',
            3: 'Local Disk',
            4: 'Network Drive',
            5: 'Compact Disc',
            6: 'RAM Disk',
        }

        return [Drive(
            letter=d['deviceid'],
            label=d['volumename'],
            drive_type=drive_types[d['drivetype']]
        ) for d in devices]


def on_devices_changed(drives: List[Drive]):
    removable_drives = [d for d in drives if d.is_removable]
    logger.debug(f'Connected removable drives: {removable_drives}')
    for drive in removable_drives:
        backup(drive)


def backup(drive: Drive):
    if drive.label != 'ABDUS':
        return
    logger.info('Backup drive has been plugged in')
    logger.info(f'Backing up {drive.letter}')


if __name__ == '__main__':
    listener = DeviceListener(on_change=on_devices_changed)
    listener.start()

When we run it and plug in a drive, it logs:

INFO:__main__:Listening to drive changes
DEBUG:__main__:Created listener window with hwnd=d60b86
DEBUG:__main__:Listening to messages
DEBUG:__main__:Received message: DBT_DEVNODES_CHANGED = A device has been added to or removed from the system.
DEBUG:__main__:Received message: DBT_DEVNODES_CHANGED = A device has been added to or removed from the system.
DEBUG:__main__:Received message: DBT_DEVICEARRIVAL = A device or piece of media has been inserted and is now available.
INFO:__main__:A device has been plugged in (or out)
DEBUG:__main__:Connected removable drives: [Drive(letter='E:', label='ABDUS', drive_type='Removable Disk')]
INFO:__main__:Backup drive has been plugged in
INFO:__main__:Backing up E:

It works 🙌.

Now what you do when your callback is called is up to you. You can:

  • Copy files to/from the drive
  • Backup photos when an SD card is inserted
  • Display a popup to warn the user
  • Lock the PC
  • Wipe the drive (🤷‍♂️ you do you)
  • ...

Alternative approach: polling for drives

This is my older attempt where I poll disks and trigger a callback when the set of drives change. It's considerably simpler, doesn't depend on pywin32 and easier to port to other operating systems (provided you know how to list drives on that platform).

import json
import subprocess
from dataclasses import dataclass
from typing import Callable, List


@dataclass
class Drive:
    letter: str
    label: str
    drive_type: str

    @property
    def is_removable(self) -> bool:
        return self.drive_type == 'Removable Disk'


def list_drives() -> List[Drive]:
    """
    Get a list of drives using WMI
    :return: list of drives
    """
    proc = subprocess.run(
        args=[
            'powershell',
            '-noprofile',
            '-command',
            'Get-WmiObject -Class Win32_LogicalDisk | Select-Object deviceid,volumename,drivetype | ConvertTo-Json'
        ],
        text=True,
        stdout=subprocess.PIPE
    )
    if proc.returncode != 0 or not proc.stdout.strip():
        print('Failed to enumerate drives')
        return []
    devices = json.loads(proc.stdout)

    drive_types = {
        0: 'Unknown',
        1: 'No Root Directory',
        2: 'Removable Disk',
        3: 'Local Disk',
        4: 'Network Drive',
        5: 'Compact Disc',
        6: 'RAM Disk',
    }

    return [Drive(
        letter=d['deviceid'],
        label=d['volumename'],
        drive_type=drive_types[d['drivetype']]
    ) for d in devices]

def watch_drives(on_change: Callable[[List[Drive]], None], poll_interval: int = 1):
    prev = None
    while True:
        drives = list_drives()
        if prev != drives:
            on_change(drives)
            prev = drives
        sleep(poll_interval)


if __name__ == '__main__':
    watch_drives(on_change=print)

You can supply your own on_change callback and perform a job if a drive is present.

Here's the output after I plug a USB drive in and out:

[Drive(letter='C:', label='', drive_type='Local Disk')]
[Drive(letter='C:', label='', drive_type='Local Disk'), Drive(letter='E:', label='ABDUS', drive_type='Removable Disk')]
[Drive(letter='C:', label='', drive_type='Local Disk')]

That's it.
Cheers ✌

If you've found this post useful, consider sharing it.

Last updated: