Extracting Blockchain Data
Stryg for at vise menuen
When you need to analyze blockchain activity, the first step is to collect historical data. There are several methods for accessing this data. You can interact directly with a blockchain node using its JSON-RPC API, which allows you to query for blocks, transactions, and account information. Many blockchains also offer public APIs—these can make it easier to fetch data without running your own node, but may have rate limits or restrictions. For more advanced or large-scale analysis, querying your own full node is often preferred, as it gives you full control and access to all the data.
import requests
def get_block_by_number(node_url, block_number):
payload = {
"jsonrpc": "2.0",
"method": "eth_getBlockByNumber",
"params": [hex(block_number), True],
"id": 1
}
response = requests.post(node_url, json=payload)
return response.json()["result"]
def extract_transactions_from_blocks(node_url, start_block, end_block):
blocks_data = []
for block_num in range(start_block, end_block + 1):
block = get_block_by_number(node_url, block_num)
if block:
blocks_data.append({
"blockNumber": block["number"],
"transactions": block["transactions"]
})
return blocks_data
# Example usage:
node_url = "http://localhost:8545"
start_block = 1000000
end_block = 1000002
blocks = extract_transactions_from_blocks(node_url, start_block, end_block)
for block in blocks:
print(f"Block: {block['blockNumber']}")
for tx in block["transactions"]:
print(f" Tx Hash: {tx['hash']}, From: {tx['from']}, To: {tx.get('to')}")
In this extraction logic, you use a loop to iterate over a range of block numbers. For each block, you send a JSON-RPC request to the node to fetch the block and all its transactions. The data is stored in a list of dictionaries, each containing the block number and a list of transactions. This structure makes it easy to process or filter transactions later, since you can access all transactions in a given block by iterating through the list. Iterating over blocks is a common approach when you want to scan the chain for specific events or activity.
def filter_transactions_by_address(blocks_data, address):
filtered = []
for block in blocks_data:
for tx in block["transactions"]:
if tx["from"].lower() == address.lower() or (tx.get("to") and tx["to"].lower() == address.lower()):
filtered.append(tx)
return filtered
# Example usage:
address = "0x1234567890abcdef1234567890abcdef12345678"
filtered_txs = filter_transactions_by_address(blocks, address)
for tx in filtered_txs:
print(f"Filtered Tx Hash: {tx['hash']} From: {tx['from']} To: {tx.get('to')}")
Tak for dine kommentarer!
Spørg AI
Spørg AI
Spørg om hvad som helst eller prøv et af de foreslåede spørgsmål for at starte vores chat