Skip to content

Commit 0475bf6

Browse files
committed
feat: Add socket pipe
1 parent 8ab5914 commit 0475bf6

File tree

7 files changed

+279
-0
lines changed

7 files changed

+279
-0
lines changed

Pipelines.Extensions/PipeReaderExtensions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using Microsoft;
2+
using Pipelines.Extensions.SocketPipe;
23
using System;
34
using System.IO.Pipelines;
5+
using System.Net.Sockets;
46
using System.Runtime.CompilerServices;
57
using System.Threading;
68
using System.Threading.Tasks;
@@ -111,5 +113,11 @@ public static async ValueTask<ReadResult> ReadAndCheckIsCanceledAsync(this PipeR
111113
result.ThrowIfCanceled(cancellationToken);
112114
return result;
113115
}
116+
117+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
118+
public static PipeReader AsPipeReader(this Socket socket, SocketPipeReaderOptions? options = null)
119+
{
120+
return new SocketPipeReader(socket, options ?? SocketPipeReaderOptions.Default);
121+
}
114122
}
115123
}

Pipelines.Extensions/PipeWriterExtensions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
using Pipelines.Extensions.SocketPipe;
12
using System;
23
using System.Buffers;
34
using System.IO.Pipelines;
5+
using System.Net.Sockets;
46
using System.Runtime.CompilerServices;
57
using System.Text;
68
using System.Threading;
@@ -86,5 +88,11 @@ public static async ValueTask<FlushResult> FlushAndCheckIsCanceledAsync(this Pip
8688
flushResult.ThrowIfCanceled(cancellationToken);
8789
return flushResult;
8890
}
91+
92+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
93+
public static PipeWriter AsPipeWriter(this Socket socket, SocketPipeWriterOptions? options = null)
94+
{
95+
return new SocketPipeWriter(socket, options ?? SocketPipeWriterOptions.Default);
96+
}
8997
}
9098
}

Pipelines.Extensions/PipelinesExtensions.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using Microsoft;
2+
using Pipelines.Extensions.SocketPipe;
23
using System.IO;
34
using System.IO.Pipelines;
5+
using System.Net.Sockets;
46
using System.Runtime.CompilerServices;
57
using System.Threading;
68
using System.Threading.Tasks;
@@ -31,5 +33,18 @@ public static IDuplexPipe AsDuplexPipe(this Stream stream,
3133

3234
return DefaultDuplexPipe.Create(reader, writer);
3335
}
36+
37+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
38+
public static IDuplexPipe AsDuplexPipe(this Socket socket,
39+
SocketPipeReaderOptions? readerOptions = null,
40+
SocketPipeWriterOptions? writerOptions = null)
41+
{
42+
Requires.Argument(socket.Connected, nameof(socket), @"Socket must be connected.");
43+
44+
var reader = socket.AsPipeReader(readerOptions);
45+
var writer = socket.AsPipeWriter(writerOptions);
46+
47+
return DefaultDuplexPipe.Create(reader, writer);
48+
}
3449
}
3550
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
using Microsoft;
2+
using Microsoft.VisualStudio.Threading;
3+
using System;
4+
using System.IO.Pipelines;
5+
using System.Net.Sockets;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
9+
namespace Pipelines.Extensions.SocketPipe
10+
{
11+
internal sealed class SocketPipeReader : PipeReader
12+
{
13+
public Socket InternalSocket { get; }
14+
15+
private readonly SocketPipeReaderOptions _options;
16+
private readonly Pipe _pipe;
17+
private PipeWriter Writer => _pipe.Writer;
18+
private PipeReader Reader => _pipe.Reader;
19+
20+
private readonly CancellationTokenSource _cancellationTokenSource;
21+
22+
public SocketPipeReader(Socket socket, SocketPipeReaderOptions options)
23+
{
24+
Requires.NotNull(socket, nameof(socket));
25+
Requires.Argument(socket.Connected, nameof(socket), @"Socket must be connected.");
26+
Requires.NotNull(options, nameof(options));
27+
28+
InternalSocket = socket;
29+
_options = options;
30+
_pipe = new Pipe(options.PipeOptions);
31+
_cancellationTokenSource = new CancellationTokenSource();
32+
33+
WrapWriterAsync(_cancellationTokenSource.Token).Forget();
34+
}
35+
36+
private Task WrapWriterAsync(CancellationToken cancellationToken)
37+
{
38+
return Task.Run(async () =>
39+
{
40+
try
41+
{
42+
while (true)
43+
{
44+
var memory = Writer.GetMemory(_options.SizeHint);
45+
46+
var readLength = await InternalSocket.ReceiveAsync(memory, _options.SocketFlags, cancellationToken);
47+
48+
if (readLength is 0)
49+
{
50+
break;
51+
}
52+
53+
Writer.Advance(readLength);
54+
55+
var flushResult = await Writer.FlushAndCheckIsCanceledAsync(cancellationToken);
56+
if (flushResult.IsCompleted)
57+
{
58+
break;
59+
}
60+
}
61+
62+
await Writer.CompleteAsync();
63+
}
64+
catch (Exception ex)
65+
{
66+
await Writer.CompleteAsync(ex);
67+
}
68+
}, cancellationToken);
69+
}
70+
71+
public override void AdvanceTo(SequencePosition consumed)
72+
{
73+
Reader.AdvanceTo(consumed);
74+
}
75+
76+
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
77+
{
78+
Reader.AdvanceTo(consumed, examined);
79+
}
80+
81+
public override void CancelPendingRead()
82+
{
83+
Reader.CancelPendingRead();
84+
}
85+
86+
public override void Complete(Exception? exception = null)
87+
{
88+
_cancellationTokenSource.Cancel();
89+
Reader.Complete(exception);
90+
}
91+
92+
public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
93+
{
94+
return Reader.ReadAsync(cancellationToken);
95+
}
96+
97+
public override bool TryRead(out ReadResult result)
98+
{
99+
return Reader.TryRead(out result);
100+
}
101+
}
102+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
2+
using System.IO.Pipelines;
3+
using System.Net.Sockets;
4+
5+
namespace Pipelines.Extensions.SocketPipe
6+
{
7+
public class SocketPipeReaderOptions
8+
{
9+
public PipeOptions PipeOptions { get; }
10+
11+
public SocketFlags SocketFlags { get; }
12+
13+
public int SizeHint { get; }
14+
15+
internal static readonly SocketPipeReaderOptions Default = new();
16+
17+
public SocketPipeReaderOptions(PipeOptions? pipeOptions = null, SocketFlags socketFlags = SocketFlags.None, int sizeHint = 0)
18+
{
19+
PipeOptions = pipeOptions ?? PipeOptions.Default;
20+
SocketFlags = socketFlags;
21+
SizeHint = sizeHint;
22+
}
23+
}
24+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
using Microsoft;
2+
using Microsoft.VisualStudio.Threading;
3+
using System;
4+
using System.IO.Pipelines;
5+
using System.Net.Sockets;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
9+
namespace Pipelines.Extensions.SocketPipe
10+
{
11+
internal sealed class SocketPipeWriter : PipeWriter
12+
{
13+
public Socket InternalSocket { get; }
14+
15+
private readonly SocketPipeWriterOptions _options;
16+
private readonly Pipe _pipe;
17+
private PipeWriter Writer => _pipe.Writer;
18+
private PipeReader Reader => _pipe.Reader;
19+
20+
private readonly CancellationTokenSource _cancellationTokenSource;
21+
22+
public SocketPipeWriter(Socket socket, SocketPipeWriterOptions options)
23+
{
24+
Requires.NotNull(socket, nameof(socket));
25+
Requires.Argument(socket.Connected, nameof(socket), @"Socket must be connected.");
26+
Requires.NotNull(options, nameof(options));
27+
28+
InternalSocket = socket;
29+
_options = options;
30+
_pipe = new Pipe(options.PipeOptions);
31+
_cancellationTokenSource = new CancellationTokenSource();
32+
33+
WrapReaderAsync(_cancellationTokenSource.Token).Forget();
34+
}
35+
36+
private Task WrapReaderAsync(CancellationToken cancellationToken)
37+
{
38+
return Task.Run(async () =>
39+
{
40+
try
41+
{
42+
while (true)
43+
{
44+
var result = await Reader.ReadAndCheckIsCanceledAsync(cancellationToken);
45+
var buffer = result.Buffer;
46+
47+
foreach (var memory in buffer)
48+
{
49+
var length = await InternalSocket.SendAsync(memory, _options.SocketFlags, cancellationToken);
50+
Report.IfNot(length == memory.Length);
51+
}
52+
53+
Reader.AdvanceTo(buffer.End);
54+
55+
if (result.IsCompleted)
56+
{
57+
break;
58+
}
59+
}
60+
61+
await Reader.CompleteAsync();
62+
}
63+
catch (Exception ex)
64+
{
65+
await Reader.CompleteAsync(ex);
66+
}
67+
}, cancellationToken);
68+
}
69+
70+
public override void Advance(int bytes)
71+
{
72+
Writer.Advance(bytes);
73+
}
74+
75+
public override Memory<byte> GetMemory(int sizeHint = 0)
76+
{
77+
return Writer.GetMemory(sizeHint);
78+
}
79+
80+
public override Span<byte> GetSpan(int sizeHint = 0)
81+
{
82+
return Writer.GetSpan(sizeHint);
83+
}
84+
85+
public override void CancelPendingFlush()
86+
{
87+
Writer.CancelPendingFlush();
88+
}
89+
90+
public override void Complete(Exception? exception = null)
91+
{
92+
_cancellationTokenSource.Cancel();
93+
Writer.Complete(exception);
94+
}
95+
96+
public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
97+
{
98+
return Writer.FlushAsync(cancellationToken);
99+
}
100+
}
101+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
2+
using System.IO.Pipelines;
3+
using System.Net.Sockets;
4+
5+
namespace Pipelines.Extensions.SocketPipe
6+
{
7+
public class SocketPipeWriterOptions
8+
{
9+
public PipeOptions PipeOptions { get; }
10+
11+
public SocketFlags SocketFlags { get; }
12+
13+
internal static readonly SocketPipeWriterOptions Default = new();
14+
15+
public SocketPipeWriterOptions(PipeOptions? pipeOptions = null, SocketFlags socketFlags = SocketFlags.None)
16+
{
17+
PipeOptions = pipeOptions ?? PipeOptions.Default;
18+
SocketFlags = socketFlags;
19+
}
20+
}
21+
}

0 commit comments

Comments
 (0)