Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a3e1ea2b44 | |||
| 6686fcdded |
@@ -37,13 +37,30 @@ def call(method, url, token=None, data=None, content_type=None, want_json=True):
|
|||||||
headers["Authorization"] = f"Bearer {token}"
|
headers["Authorization"] = f"Bearer {token}"
|
||||||
if content_type:
|
if content_type:
|
||||||
headers["Content-Type"] = content_type
|
headers["Content-Type"] = content_type
|
||||||
req = urllib.request.Request(url, data=data, method=method, headers=headers)
|
# Transient-fault retries: googleapis.com occasionally drops the TLS session ("EOF
|
||||||
try:
|
# occurred in violation of protocol" — failed two release uploads on 2026-07-02) or
|
||||||
with urllib.request.urlopen(req, timeout=300) as r:
|
# answers 5xx. Retry those with backoff; 4xx raises immediately (a real API error).
|
||||||
body = r.read()
|
# The edits API is transactional until commit, so re-sending any of these is safe.
|
||||||
except urllib.error.HTTPError as e:
|
last = None
|
||||||
raise ApiError(e.code, method, url, e.read().decode("utf-8", "replace"))
|
for attempt in range(4):
|
||||||
return json.loads(body) if (want_json and body) else body
|
if attempt:
|
||||||
|
delay = 3**attempt
|
||||||
|
print(f"transient Play API failure ({last}); retry {attempt}/3 in {delay}s")
|
||||||
|
time.sleep(delay)
|
||||||
|
req = urllib.request.Request(url, data=data, method=method, headers=headers)
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(req, timeout=300) as r:
|
||||||
|
body = r.read()
|
||||||
|
return json.loads(body) if (want_json and body) else body
|
||||||
|
except urllib.error.HTTPError as e:
|
||||||
|
if e.code >= 500:
|
||||||
|
last = f"HTTP {e.code}"
|
||||||
|
continue
|
||||||
|
raise ApiError(e.code, method, url, e.read().decode("utf-8", "replace"))
|
||||||
|
except urllib.error.URLError as e:
|
||||||
|
last = str(getattr(e, "reason", e))
|
||||||
|
continue
|
||||||
|
sys.exit(f"ERROR: {method} {url} still failing after retries: {last}")
|
||||||
|
|
||||||
|
|
||||||
def load_sa():
|
def load_sa():
|
||||||
|
|||||||
@@ -820,8 +820,10 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn sender_delivers_batches() {
|
fn sender_delivers_batches() {
|
||||||
let rx_sock = UdpSocket::bind("127.0.0.1:0").unwrap();
|
let rx_sock = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
|
// Generous: on a CI host saturated by parallel release builds, this thread can be
|
||||||
|
// starved for whole seconds between recv() wakeups.
|
||||||
rx_sock
|
rx_sock
|
||||||
.set_read_timeout(Some(Duration::from_secs(3)))
|
.set_read_timeout(Some(Duration::from_secs(10)))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let tx_sock = UdpSocket::bind("127.0.0.1:0").unwrap();
|
let tx_sock = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||||
tx_sock.connect(rx_sock.local_addr().unwrap()).unwrap();
|
tx_sock.connect(rx_sock.local_addr().unwrap()).unwrap();
|
||||||
@@ -837,10 +839,15 @@ mod tests {
|
|||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// 3 frames of 100 packets, content-tagged for verification.
|
// 3 frames of 20 packets, content-tagged for verification. The TOTAL burst must fit
|
||||||
|
// the receive socket's DEFAULT buffer even if this thread never drains concurrently
|
||||||
|
// (a starved CI runner): a 1200 B datagram costs ~2.5 KB kernel truesize, and the
|
||||||
|
// default rmem (~212 KB) holds only ~80 — a bigger burst gets silently dropped by
|
||||||
|
// the kernel and the test can never complete (the old 3×100 flaked exactly there).
|
||||||
|
const PER_FRAME: usize = 20;
|
||||||
let mut sent = Vec::new();
|
let mut sent = Vec::new();
|
||||||
for f in 0..3u8 {
|
for f in 0..3u8 {
|
||||||
let batch: PacketBatch = (0..100u8)
|
let batch: PacketBatch = (0..PER_FRAME as u8)
|
||||||
.map(|i| {
|
.map(|i| {
|
||||||
let mut p = vec![0u8; 1200];
|
let mut p = vec![0u8; 1200];
|
||||||
p[0] = f;
|
p[0] = f;
|
||||||
@@ -859,10 +866,10 @@ mod tests {
|
|||||||
let n = rx_sock.recv(&mut buf).expect("packet within timeout");
|
let n = rx_sock.recv(&mut buf).expect("packet within timeout");
|
||||||
assert_eq!(n, 1200);
|
assert_eq!(n, 1200);
|
||||||
let (f, i) = (buf[0] as usize, buf[1] as usize);
|
let (f, i) = (buf[0] as usize, buf[1] as usize);
|
||||||
assert_eq!(&buf[..n], &sent[f * 100 + i][..], "payload intact");
|
assert_eq!(&buf[..n], &sent[f * PER_FRAME + i][..], "payload intact");
|
||||||
got += 1;
|
got += 1;
|
||||||
}
|
}
|
||||||
assert_eq!(got, 300);
|
assert_eq!(got, 3 * PER_FRAME);
|
||||||
assert!(running.load(Ordering::SeqCst), "no spurious client-gone");
|
assert!(running.load(Ordering::SeqCst), "no spurious client-gone");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user