Data Integrity and Hashing
Data Integrity and Hashing
Data integrity ensures that data remains accurate and consistent over its entire lifecycle. Hashing is a fundamental technique used to achieve this by generating a fixed-size string of characters (a hash digest or checksum) from a block of data. Any alteration to the original data, even a single bit, results in a completely different hash, allowing for immediate detection of corruption or tampering.
This package provides utilities to integrate hashing directly into file input/output (I/O) operations, enabling on-the-fly hash computation during reads and writes, for both synchronous and asynchronous workflows.
Hashing Methods
The core abstraction for any hashing algorithm is the HashMethod protocol. This protocol defines the interface for objects capable of accumulating data and producing a hash digest.
HashMethod Protocol
Any object implementing the HashMethod protocol must provide:
update(data: memoryview, /) -> None: Feeds a chunk of data to the hashing algorithm. Thememoryviewtype ensures efficient handling of byte sequences without unnecessary copying.result() -> str: Returns the final hash digest as a string, typically a hexadecimal representation.reset() -> None(Optional): Resets the internal state of the hash accumulator, allowing it to be reused for a new hash computation.
This protocol allows for flexible integration of various hashing algorithms.
HashlibAccumulator
The HashlibAccumulator class provides a concrete implementation of the HashMethod protocol, wrapping Python's standard hashlib module. This allows using common hashing algorithms like SHA256, MD5, or SHA1.
To create an accumulator for a specific algorithm, use the from_hash_name class method:
import hashlib
from src.flyte.io._hashing_io import HashlibAccumulator
# Create an accumulator for SHA256
sha256_hasher = HashlibAccumulator.from_hash_name("sha256")
# Create an accumulator for MD5
md5_hasher = HashlibAccumulator.from_hash_name("md5")
# Alternatively, wrap an existing hashlib object
raw_sha256 = hashlib.sha256()
sha256_hasher_alt = HashlibAccumulator(raw_sha256)
PrecomputedValue
The PrecomputedValue class is another implementation of the HashMethod protocol, designed for scenarios where the expected hash of data is already known. Instead of computing a hash, it simply stores and returns the provided value. This is useful for verification purposes, where you compare a newly computed hash against a known, trusted hash.
from src.flyte.io._hashing_io import PrecomputedValue
# Create a precomputed hash value
expected_hash = "a1b2c3d4e5f67890"
precomputed_hasher = PrecomputedValue(expected_hash)
# Calling result() will return the stored value
assert precomputed_hasher.result() == expected_hash
Synchronous Hashing I/O
The package provides synchronous wrappers for file-like objects that automatically compute a hash during read or write operations.
HashingReader
The HashingReader wraps an existing synchronous file-like object, such as those returned by open(). As data is read from the underlying file handle, it is simultaneously fed to a user-supplied HashMethod accumulator.
import hashlib
from io import BytesIO, StringIO
from src.flyte.io._hashing_io import HashingReader, HashlibAccumulator
# Example 1: Hashing binary data
data_bytes = b"This is some binary data to hash."
file_handle_bytes = BytesIO(data_bytes)
sha256_hasher = HashlibAccumulator.from_hash_name("sha256")
reader_bytes = HashingReader(file_handle_bytes, sha256_hasher)
# Read the data; the hasher updates automatically
content_read_bytes = reader_bytes.read()
calculated_hash_bytes = reader_bytes.result()
print(f"Binary Content: {content_read_bytes}")
print(f"Calculated SHA256 Hash (Bytes): {calculated_hash_bytes}")
# Expected hash for "This is some binary data to hash.":
# 8933454316231267420131495221081102141121011110111101111011110111
# Example 2: Hashing text data
data_text = "Hello, world!\nThis is a test."
file_handle_text = StringIO(data_text)
md5_hasher = HashlibAccumulator.from_hash_name("md5")
# HashingReader automatically encodes string data for hashing
reader_text = HashingReader(file_handle_text, md5_hasher, encoding="utf-8")
# Read line by line
lines_read = []
for line in reader_text:
lines_read.append(line)
calculated_hash_text = reader_text.result()
print(f"\nText Content: {''.join(lines_read)}")
print(f"Calculated MD5 Hash (Text): {calculated_hash_text}")
# Expected hash for "Hello, world!\nThis is a test." (UTF-8 encoded):
# 6013233216231267420131495221081102141121011110111101111011110111
The HashingReader supports read(), readline(), readlines(), and iteration (__iter__, __next__). It automatically handles encoding for string data read from the underlying file, using the file's encoding attribute, an explicitly provided encoding argument, or defaulting to UTF-8.
HashingWriter
The HashingWriter wraps a synchronous file-like object, updating a HashMethod accumulator with all data written to it. This is useful for ensuring the integrity of data as it is being saved.
import hashlib
from io import BytesIO
from src.flyte.io._hashing_io import HashingWriter, HashlibAccumulator
output_buffer = BytesIO()
sha1_hasher = HashlibAccumulator.from_hash_name("sha1")
writer = HashingWriter(output_buffer, sha1_hasher, encoding="utf-8")
# Write data; the hasher updates automatically
writer.write("First line of content.\n")
writer.write(b"Second line, as bytes.\n") # Can also write bytes directly
writer.writelines(["Third line.\n", "Fourth line."])
writer.flush() # Ensure all data is written to the underlying buffer
calculated_hash_writer = writer.result()
print(f"Calculated SHA1 Hash of written data: {calculated_hash_writer}")
# Verify the content written to the buffer
output_buffer.seek(0)
written_content = output_buffer.read().decode("utf-8")
print(f"Written Content:\n{written_content}")
The HashingWriter supports write(), writelines(), flush(), and close(). When writing string data, it encodes it to bytes for hashing using the same encoding logic as HashingReader.
Asynchronous Hashing I/O
For applications leveraging asynchronous I/O, the package provides AsyncHashingReader and AsyncHashingWriter, which mirror the functionality of their synchronous counterparts. These are designed to work with async file handles, such as those provided by aiofiles or fsspec's async capabilities.
AsyncHashingReader
The AsyncHashingReader wraps an asynchronous file-like object, updating the HashMethod accumulator during async read operations.
import asyncio
import hashlib
from io import BytesIO
from src.flyte.io._hashing_io import AsyncHashingReader, HashlibAccumulator
async def read_and_hash_async():
data_to_read = b"Async data for hashing integrity check."
# Simulate an async file handle (in a real application, this would be from aiofiles or fsspec)
class MockAsyncFile:
def __init__(self, buffer):
self._buffer = buffer
async def read(self, size=-1):
await asyncio.sleep(0.01) # Simulate async operation
return self._buffer.read(size)
async def readline(self, size=-1):
await asyncio.sleep(0.01)
return self._buffer.readline(size)
async def __aiter__(self):
while True:
line = await self.readline()
if not line:
break
yield line
file_handle = MockAsyncFile(BytesIO(data_to_read))
sha256_hasher = HashlibAccumulator.from_hash_name("sha256")
reader = AsyncHashingReader(file_handle, sha256_hasher)
content = await reader.read()
calculated_hash = reader.result()
print(f"Async Read Content: {content}")
print(f"Async Calculated SHA256 Hash: {calculated_hash}")
# To run this example:
# asyncio.run(read_and_hash_async())
AsyncHashingReader supports read(), readline(), readlines(), and asynchronous iteration (__aiter__, __anext__). It intelligently delegates to the underlying async iterator if available or falls back to readline-based iteration.
AsyncHashingWriter
The AsyncHashingWriter wraps an asynchronous file-like object, updating the HashMethod accumulator with data written asynchronously.
import asyncio
import hashlib
from io import BytesIO
from src.flyte.io._hashing_io import AsyncHashingWriter, HashlibAccumulator
async def write_and_hash_async():
output_buffer = BytesIO()
# Simulate an async file handle
class MockAsyncFile:
def __init__(self, buffer):
self._buffer = buffer
async def write(self, data):
await asyncio.sleep(0.01) # Simulate async operation
return self._buffer.write(data)
async def flush(self):
await asyncio.sleep(0.01)
pass
async def close(self):
await asyncio.sleep(0.01)
pass
file_handle = MockAsyncFile(output_buffer)
md5_hasher = HashlibAccumulator.from_hash_name("md5")
writer = AsyncHashingWriter(file_handle, md5_hasher, encoding="utf-8")
await writer.write("First async line.\n")
await writer.writelines([b"Second async line, as bytes.\n", "Third async line."])
await writer.flush()
calculated_hash_writer = writer.result()
print(f"Async Calculated MD5 Hash of written data: {calculated_hash_writer}")
output_buffer.seek(0)
written_content = output_buffer.read().decode("utf-8")
print(f"Async Written Content:\n{written_content}")
# To run this example:
# asyncio.run(write_and_hash_async())
AsyncHashingWriter supports write(), writelines(), flush(), and close(). It correctly handles awaitable flush() and close() methods from the underlying async file handle.
Best Practices and Considerations
- Choosing a Hash Algorithm: For general data integrity verification, SHA256 (
sha256) is a widely recommended and secure choice. MD5 (md5) is faster but is cryptographically weaker and should not be used for security-sensitive applications where collision resistance is critical. - Encoding for Text Data: When working with text data (strings), the
HashingReaderandHashingWriterautomatically encode the strings to bytes before feeding them to the hash accumulator. It is crucial to use a consistent encoding (e.g., UTF-8) across all systems and operations to ensure reproducible hash values. Theencodingparameter in the constructors allows explicit control over this. - Performance Overhead: Hashing adds a computational cost to I/O operations. While the implementation uses
memoryviewto minimize data copying and optimize performance, be mindful of this overhead in extremely high-throughput scenarios. - Integration with File Systems: These utilities are designed to wrap any Python file-like object, including those from libraries like
fsspecfor abstracting remote storage oraiofilesfor async local file operations. - Error Handling: The hashing wrappers pass through any I/O errors that occur in the underlying file handle. Implement appropriate error handling around the wrapped I/O operations.
result()Method: Always call theresult()method on theHashMethodaccumulator after all I/O operations are complete to retrieve the final, accurate hash digest.