SEISAMUSE

Jun Xie的博客

  以下是地震数据下载脚本更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import os
from obspy import UTCDateTime
from obspy.clients.fdsn import Client
from obspy.taup import TauPyModel
from obspy.geodetics import locations2degrees
from concurrent.futures import ThreadPoolExecutor, as_completed

client = Client("IRIS")
model = TauPyModel("iasp91")

output_dir = "processed_sac"
os.makedirs(output_dir, exist_ok=True)

event_file = "event.lst"
sta_file = "sta.lst"
exception_log = "exceptions.txt"
thread_workers = 4

# 读取台站信息:net, sta, lat, lon, elev
with open(sta_file, "r") as sf:
sta_lines = [line.strip() for line in sf if line.strip()]
sta_list = []
for line in sta_lines:
parts = line.split("|")
if len(parts) >= 5:
net = parts[0].strip()
sta = parts[1].strip()
lat = float(parts[2])
lon = float(parts[3])
elev = float(parts[4])
sta_list.append((net, sta, lat, lon, elev))

# 清空异常日志
with open(exception_log, "w") as elog:
elog.write("")

def process_station(event_id, origin_time, magnitude, ev_lat, ev_lon, ev_dep,
net, sta, sta_lat, sta_lon, sta_elev):
try:
dist_deg = locations2degrees(ev_lat, ev_lon, sta_lat, sta_lon)
arrivals = model.get_travel_times(source_depth_in_km=ev_dep,
distance_in_degree=dist_deg,
phase_list=["P"])
if not arrivals:
raise Exception("No P arrival")

p_arrival = origin_time + arrivals[0].time
start_dl = p_arrival - 50
end_dl = p_arrival + 150

st = client.get_waveforms(network=net, station=sta, location="*", channel="BH?",
starttime=start_dl, endtime=end_dl,
attach_response=True)

st.remove_response(output="VEL", pre_filt=(0.01, 0.02, 8, 10),
taper=True, zero_mean=True, taper_fraction=0.05)

st.trim(starttime=p_arrival - 10, endtime=p_arrival + 60)

comps = [tr.stats.channel[-1] for tr in st]
if not all(comp in comps for comp in ["N", "E", "Z"]):
raise Exception("Incomplete 3-component data")

for tr in st:
tr.stats.sac = tr.stats.get("sac", {})
tr.stats.sac.stla = sta_lat
tr.stats.sac.stlo = sta_lon
tr.stats.sac.stel = sta_elev
tr.stats.sac.evla = ev_lat
tr.stats.sac.evlo = ev_lon
tr.stats.sac.evdp = ev_dep
tr.stats.sac.mag = float(magnitude) if magnitude != "NA" else 0.0

time_tag = origin_time.strftime("%Y%m%dT%H%M%S")
chan = tr.stats.channel
filename = f"{time_tag}_M{magnitude}_{net}.{sta}.{chan}.sac"
filepath = os.path.join(output_dir, filename)
tr.write(filepath, format="SAC")

return f"{net}.{sta} ✅"

except Exception as e:
with open(exception_log, "a") as elog:
elog.write(f"{event_id} {net}.{sta} ❌ {str(e)}\n")
return f"{net}.{sta} ❌ {str(e)}"

def process_event(line):
results = []
parts = line.split("|")
if len(parts) < 10:
return ["跳过格式错误行"]

evid = parts[0].strip()
time_str = parts[1].strip()
ev_lat = float(parts[2].strip())
ev_lon = float(parts[3].strip())
ev_dep = float(parts[4].strip())
mag_info = parts[8].strip()
magnitude = mag_info.split(",")[1] if "," in mag_info else "NA"

origin_time = UTCDateTime(time_str)
print(f"\n🟡 Processing event {evid} M{magnitude} @ {origin_time} ({ev_lat}, {ev_lon}, {ev_dep}km)")

try:
task_list = []
with ThreadPoolExecutor(max_workers=thread_workers) as executor:
for net, sta, sta_lat, sta_lon, sta_elev in sta_list:
task = executor.submit(process_station, evid, origin_time, magnitude,
ev_lat, ev_lon, ev_dep, net, sta, sta_lat, sta_lon, sta_elev)
task_list.append(task)

for task in as_completed(task_list):
results.append(task.result())

except Exception as e:
with open(exception_log, "a") as elog:
elog.write(f"{evid} XB ERROR: {str(e)}\n")
results.append(f"⚠️ Failed to process event {evid}: {e}")
return results

# 读取事件列表并执行
with open(event_file, "r") as f:
event_lines = [line.strip() for line in f if line.strip()]

for line in event_lines:
results = process_event(line)
for r in results:
print(r)

  与之前的脚本:下载地震数据练习不同的地方是多了文件sta.lst。其格式为:
TA|140A|32.6408|-93.573997|56.0|
台网|台站|纬度|经度|高程

  有些时候需要抓取别人图片中的点的信息,应该怎么整?当然是找作者要咯!不过有时可能没办法(例如可能联系不上作者/作者找不到数据了/作者不想给你/作者说你自己提取吧)或则并不需要他图片中的点的准确信息的时候,我们可以自己去点。之前有个perl脚本,但找不到了(这又体现了整理资料和做笔记的必要性),现在python很方便的,调用pynput的Listener就好了。记录鼠标点击的脚本如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from pynput.mouse import Listener
import logging
from datetime import datetime

# 配置日志记录
logging.basicConfig(
filename="mouse_clicks.log",
level=logging.INFO,
format="%(asctime)s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S"
)

def on_click(x, y, button, pressed):
if pressed:
message = f"{x} {y}"
print(message) # 输出到屏幕
logging.info(message) # 写入日志文件
if button == button.right:
# 停止监听器
return False

# 启动鼠标监听器
with Listener(on_click=on_click) as listener:
try:
listener.join()
except KeyboardInterrupt:
print("监听器已被用户中断。")

  这个脚本会记录鼠标左键的位置,输出到mouse_clicks.log文件中。mouse_clicks.log文件有三列:

2025-05-07T17:01:33 1977 1576

分别表示点击的时间,x和y坐标。其中y坐标是下面大上面小。此外点击鼠标右键或者输入Ctrl+C就可以结束记录。我一般是怎么做的呢?我先点击图片左下角得到(x0,y0),点击右下角得到(x1,y0),点击左上角得到(x0,y1),然后点击你想要的点。得到mouse_clicks.log以后就可以这么画图(gmt)。

1
2
3
4
5
6
7
8
9
10
11
12
dat=mouse_clicks.log
#获得x0,y0,x1,y1,他们是参考点
x0=`awk 'NR==1{print $2}' ${dat}`
y0=`awk 'NR==1{print $3}' ${dat}`
x1=`awk 'NR==2{print $2}' ${dat}`
y1=`awk 'NR==3{print $3}' ${dat}`
#获得横纵轴长度(屏幕尺度)
xs=`echo "$x1-$x0" |bc`
ys=`echo "$y0-$y1" |bc`

awk -v x0=$x0 -v y0=$y0 -v xs=$xs -v ys=$ys 'NR>3{print ($2-x0)/xs*2000-1000,(y0-$3)/ys*1500}' ${dat} | gmt psxy -R -J -O -K -W3p,black >>$ps
#这里(2000,1500)是横纵轴实际尺度,-1000表示实际位置调整。

  注意,这种方法仅仅适用于线性笛卡尔坐标,其他的各种投影都不行。
  以后得加上这句:脚本/程序不保证正确性,自求多幅(no warranty/use at your own risk)。

  用gmt画箭头还是挺常见的。GMT documentGMT中文手册虽好,但如果网络不好不就歇菜了,还是记录一下吧,自己翻起来方便。画箭头参数是Sv(或SV),自己调Sv之后的参数可以看效果。输入参数表示是x,y,angle,length。x,y表示箭头的起始坐标。angle是箭头指向,逆时针为正,x轴正向为0。length就是那个length了。

1
echo 0 0 60 0.5i | gmt psxy -R -J -O -K -Sv0.25c+e0.2c -W0.5p -Gblack >>$ps

  接收函数或者其他方法需要下载地震波形数据。这里给出结合FetchEvent和obspy进行数据下载的例子。
  FetchEvent从地址https://github.com/EarthScope/fetch-scripts下载。他是用perl脚本写的。运行例子如下:

1
./FetchEvent -s 2006-01-01 -e 2007-05-01 --radius 5:12:95:28 --mag 5:10 -o event.lst

  表示下载发生时间为2006-01-01到2007-05-01,位于以(5,12)(lat,lon)为中心,半径28-95度,震级5-10级的地震信息,保存到event.lst中。
  下载到的event.lst内容如下:

1
8037834 |2006/01/23 20:50:46.19Z |  6.9168| -77.7951|  24.0355|ISC|||MS,5.86,ISC|mb,6.16,ISC|Near west coast of Colombia

  表示ID|时间|纬度|经度|深度|目录|||震级类型,震级,目录|震级类型,震级,目录|位置
  可以看出地震信息来自ISC目录,其实到ISC直接检索也很方便。

  接下来,下载XB台网所有台站接收到的地震理论P波到时前50秒到后150秒三分量数据。其中P波到时调用taup来计算。接下来去仪器响应,最后再截取P波前10秒到后60秒。保存为SAC格式,每个地震每个台站保存一个SAC,名称需包含地震时间震级及台站名。利用多线程ThreadPoolExecutor加速。脚本如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import os
from obspy import UTCDateTime, read_inventory
from obspy.clients.fdsn import Client
from obspy.taup import TauPyModel
from obspy.geodetics import locations2degrees
from concurrent.futures import ThreadPoolExecutor, as_completed

client = Client("IRIS")
model = TauPyModel("iasp91")

output_dir = "processed_sac"
os.makedirs(output_dir, exist_ok=True)

event_file = "event.lst"
exception_log = "exceptions.txt"
thread_workers = 4 # 可调线程数

with open(event_file, "r") as f:
event_lines = [line.strip() for line in f if line.strip()]

with open(exception_log, "w") as elog:
elog.write("")

def process_station(event_id, origin_time, magnitude, ev_lat, ev_lon, ev_dep, net, sta, sta_lat, sta_lon, sta_elev):
try:
# 使用 locations2degrees 计算震中距
dist_deg = locations2degrees(ev_lat, ev_lon, sta_lat, sta_lon)
print(dist_deg)
arrivals = model.get_travel_times(source_depth_in_km=ev_dep,
distance_in_degree=dist_deg,
phase_list=["P"])
if not arrivals:
raise Exception("No P arrival")

p_arrival = origin_time + arrivals[0].time
start_dl = p_arrival - 50
end_dl = p_arrival + 150

# 下载三分量数据
st = client.get_waveforms(network=net, station=sta, location="*", channel="BH?",
starttime=start_dl, endtime=end_dl,
attach_response=True)

# 去响应
st.remove_response(output="VEL", pre_filt=(0.01, 0.02, 8, 10), taper=True, zero_mean=True, taper_fraction=0.05)

# 截取有效窗口
st.trim(starttime=p_arrival - 10, endtime=p_arrival + 60)

# 检查是否为三分量
if not all(comp in [tr.stats.channel[-1] for tr in st] for comp in ["N", "E", "Z"]):
raise Exception("Incomplete 3-component data")

# 写入 SAC 文件并添加头段信息
for tr in st:
tr.stats.sac = tr.stats.get("sac", {})
tr.stats.sac.stla = sta_lat
tr.stats.sac.stlo = sta_lon
tr.stats.sac.stel = sta_elev
tr.stats.sac.evla = ev_lat
tr.stats.sac.evlo = ev_lon
tr.stats.sac.evdp = ev_dep
tr.stats.sac.mag = float(magnitude) if magnitude != "NA" else 0.0

time_tag = origin_time.strftime("%Y%m%dT%H%M%S")
chan = tr.stats.channel
filename = f"{time_tag}_M{magnitude}_{net}.{sta}.{chan}.sac"
filepath = os.path.join(output_dir, filename)
tr.write(filepath, format="SAC")
return f"{net}.{sta} ✅"
except Exception as e:
with open(exception_log, "a") as elog:
elog.write(f"{event_id} {net}.{sta} ❌ {str(e)}\n")
return f"{net}.{sta} ❌ {str(e)}"

def process_event(line):
results = []
parts = line.split("|")
if len(parts) < 10:
return ["跳过格式错误行"]

evid = parts[0].strip()
time_str = parts[1].strip()
ev_lat = float(parts[2].strip())
ev_lon = float(parts[3].strip())
ev_dep = float(parts[4].strip())
mag_info = parts[8].strip()
magnitude = mag_info.split(",")[1] if "," in mag_info else "NA"

origin_time = UTCDateTime(time_str)
print(f"\n🟡 Processing event {evid} M{magnitude} @ {origin_time} ({ev_lat}, {ev_lon}, {ev_dep}km)")

try:
inventory = client.get_stations(network="XB", starttime=origin_time,
endtime=origin_time + 3600,
level="station", channel="BH?")
task_list = []
with ThreadPoolExecutor(max_workers=thread_workers) as executor:
for network in inventory:
for station in network:
sta_lat = station.latitude
sta_lon = station.longitude
sta_elev = station.elevation
sta = station.code
net = network.code
task = executor.submit(process_station, evid, origin_time, magnitude,
ev_lat, ev_lon, ev_dep, net, sta, sta_lat, sta_lon, sta_elev)
task_list.append(task)

for task in as_completed(task_list):
results.append(task.result())

except Exception as e:
with open(exception_log, "a") as elog:
elog.write(f"{evid} XB ERROR: {str(e)}\n")
results.append(f"⚠️ Failed to process event {evid}: {e}")
return results

# 执行所有事件处理
for line in event_lines:
results = process_event(line)
for r in results:
print(r)

用我的脚本下载了一堆的miniseed放在seed下,仪器响应文件放在resp文件夹下,利用obspy去除仪器响应,还保存为miniseed。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import os
from glob import glob
from obspy import read, read_inventory
# 设置路径
seed_folder = "seed"
resp_folder = "resp"
output_folder = "seed_corrected"
os.makedirs(output_folder, exist_ok=True)
seed_files = glob(os.path.join(seed_folder, "*.seed")) # 遍历所有 seed 文件
for seed_file in seed_files:
print(f"Processing {os.path.basename(seed_file)}")
try:
st = read(seed_file) # 读取 MiniSEED 数据
for tr in st:
net = tr.stats.network
sta = tr.stats.station
loc = tr.stats.location
cha = tr.stats.channel
resp_file = os.path.join(resp_folder, f"RESP.{net}.{sta}.{loc}.{cha}") # 构造 RESP 文件路径
if not os.path.exists(resp_file):
print(f"RESP file not found: {resp_file}, skipping trace.")
continue
inventory = read_inventory(resp_file) # 读取 RESP 文件作为 inventory
# 去除仪器响应
tr.remove_response(inventory=inventory, output="VEL", pre_filt=[0.001, 0.002, 0.3, 0.45 ], zero_mean=True, taper=True)
output_file = os.path.join(output_folder, os.path.basename(seed_file)) # 输出文件名
st.write(output_file, format="MSEED")
print(f"Saved corrected data to {output_file}\n")
except Exception as e:
print(f"Error processing {seed_file}: {e}")

感谢Seisman写了FnetPy,可以很方便地下载数据。
安装:

1
pip install https://github.com/seisman/FnetPy/archive/master.zip

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import os
from obspy.io.xseed import Parser
from FnetPy import Client
from datetime import datetime,timedelta
from obspy import read_inventory,read,Trace
from datetime import datetime
client = Client('用户名', '密码')

current_date=datetime(2013,1,1,0,0)
#current_date=datetime(2004,12,30,0,0)
end_date=datetime(2025,1,1,0,0)
output_dir='./seed'
raw_dir='./raw'
resp_dir='./resp'
os.makedirs(output_dir, exist_ok=True)
os.makedirs(raw_dir, exist_ok=True)
os.makedirs(resp_dir, exist_ok=True)
while (current_date<end_date):
start_time = current_data
start_str = current_date.strftime("%Y_%m_%d_%H_%M_%S")
name=start_str+".seed"
fname=os.path.join(raw_dir,name)
output_file=os.path.join(output_dir, name)
if os.path.exists(output_file):
print(f"[跳过] 文件已存在: {output_file}")
current_date+=timedelta(days=1)
continue
open(output_file, 'wb').close() # 创建空文件
if not os.path.exists(fname):
print('download data from '+str(start_time)+' to '+str(end_time))
client.get_waveform(start_time, duration_in_seconds=86400+3600,component="LHZ",filename=fname) # download
print('download '+fname+' down!')
current_date+=timedelta(days=1)
st = read(fname) # read
parser = Parser(fname) #parse
parser.write_resp(resp_dir, zipped=False)
for tr in st:
resp_file = f"RESP.{tr.get_id()}"
inv = read_inventory(os.path.join(resp_dir, resp_file))
tr.remove_response(inventory=inv, output="VEL")
#print(tr.stats)
st.write(output_file, format="MSEED")
print('Finished')

这个脚本可以下载2013到2025年Fnet所有台站的LHZ连续波形数据,数据长度为25小时,overlap 1小时。去除仪器响应后的速度记录存放在seed文件夹中。每天的数据名称为%Y_%m_%d_%H_%M_%S.seed。账户需要去NIED去申请,当然用我的也可以。

利用obspy的MassDownloader下载连续的波形示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import obspy
from obspy.clients.fdsn.mass_downloader import RectangularDomain, CircularDomain, \
Restrictions, MassDownloader
# Rectangular domain containing parts of southern Germany.
#domain = RectangularDomain(minlatitude=-15, maxlatitude=18,
# minlongitude=10, maxlongitude=22)
domain = CircularDomain(
latitude=0,
longitude=0,
minradius=0.0,
maxradius=12.0
)
restrictions = Restrictions(
# Get data for a whole year.
starttime=obspy.UTCDateTime(2005, 1, 15),
endtime=obspy.UTCDateTime(2007, 10, 15),
# Chunk it to have one file per day.
chunklength_in_sec=86400,
# Considering the enormous amount of data associated with continuous
# requests, you might want to limit the data based on SEED identifiers.
# If the location code is specified, the location priority list is not
# used; the same is true for the channel argument and priority list.
network="??", station="????", location="*", channel="LHZ",
# The typical use case for such a data set are noise correlations where
# gaps are dealt with at a later stage.
reject_channels_with_gaps=False,
# Same is true with the minimum length. All data might be useful.
minimum_length=0.0,
# Guard against the same station having different names.
minimum_interstation_distance_in_m=100.0)
# Restrict the number of providers if you know which serve the desired
# data. If in doubt just don't specify - then all providers will be
# queried.
#mdl = MassDownloader(providers=["IRIS","USGS","GFZ"])
mdl = MassDownloader()
mdl.download(domain, restrictions, mseed_storage="waveforms",
stationxml_storage="stations")

  1. Fast analysis of the seismic rupture SSA2py is an emergent python based software that allows fast analysis of the seismic rupture, making possible the near-realtime identification of the rupture characteristics after a significant seismic event.
  2. Fast matching filter Seismic matched-filter search for both CPU and GPU architectures.
  3. MUSIC Teleseismic Back-Projection Perform the Back-Projection Imaging on the seismograms of large earthquakes recorded by large-scale dense arrays.
  4. Seismic_BPMF Fully automated workflow for earthquake detection and location with the backprojection and matched filtering methods.
  5. ARRU_seismic_backprojection ARRU Phase Picker: Attention Recurrent‐Residual U‐Net for Picking Seismic P‐ and S‐Phase Arrivals.
  6. earthquake_back_projection Back-projection of high-frequency radiation from earthquake source using multiple arrays. Methodology is based on Ishii (2012).
  7. earthquake_detection Codes used in the earthquake detection and location method presented in Beauce et al. 2019, DOI: 10.1029/2019JB018110. A real data example is also provided.
  8. beampower Fast routines for seismic backprojection/beamforming for both CPU and GPU architectures.
  9. Iterative Linear Stress Inversion (ILSI) Python package for stress tensor inversion.
  10. Pyrocko A Python framework for efficient use of pre-computed Green’s functions in seismological and other physical forward and inverse source problems.
0%